From 9eb3dc021dc8839ad4f41b7f50e4b23fe7e84743 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 11 May 2023 11:04:07 -0400 Subject: [PATCH 1/2] changefeedccl: use new parquet library This change updates changefeeds to use the new parquet library added in `pkg/util/parquet` when using `format=parquet`. Informs: https://github.com/cockroachdb/cockroach/issues/99028 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071 Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 5 - pkg/ccl/changefeedccl/changefeed_test.go | 1 - .../changefeedccl/changefeedbase/options.go | 43 ++-- .../changefeedbase/options_test.go | 1 - pkg/ccl/changefeedccl/encoder_test.go | 16 +- pkg/ccl/changefeedccl/helpers_test.go | 18 +- pkg/ccl/changefeedccl/parquet.go | 69 +++++- .../parquet_sink_cloudstorage.go | 192 +++------------ pkg/ccl/changefeedccl/parquet_test.go | 5 +- pkg/ccl/changefeedccl/sink.go | 7 +- pkg/ccl/changefeedccl/sink_cloudstorage.go | 19 +- .../changefeedccl/sink_cloudstorage_test.go | 48 ++-- pkg/ccl/changefeedccl/testfeed_test.go | 219 ++++++------------ pkg/ccl/changefeedccl/testing_knobs.go | 4 + pkg/util/parquet/schema.go | 7 + pkg/util/parquet/testutils.go | 1 + 16 files changed, 259 insertions(+), 396 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9286954617c9..fb5ad09d6b23 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -96,7 +96,6 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", "//pkg/sql/flowinfra", - "//pkg/sql/importer", "//pkg/sql/isql", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", @@ -154,9 +153,6 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", - "@com_github_fraugster_parquet_go//:parquet-go", - "@com_github_fraugster_parquet_go//parquet", - "@com_github_fraugster_parquet_go//parquetschema", "@com_github_gogo_protobuf//jsonpb", "@com_github_gogo_protobuf//types", "@com_github_google_btree//:btree", @@ -317,7 +313,6 @@ go_test( "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", - "@com_github_fraugster_parquet_go//:parquet-go", "@com_github_gogo_protobuf//types", "@com_github_jackc_pgx_v4//:pgx", "@com_github_lib_pq//:pq", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e5d5bb078add..14cf9f84c17b 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7668,7 +7668,6 @@ func TestChangefeedPredicateWithSchemaChange(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes too long under race") - defer TestingSetIncludeParquetMetadata()() setupSQL := []string{ `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 413e824286f8..fd4419160dcb 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -423,18 +423,31 @@ var NoLongerExperimental = map[string]string{ DeprecatedSinkSchemeCloudStorageS3: SinkSchemeCloudStorageS3, } +// OptionsSet is a test of changefeed option strings. +type OptionsSet map[string]struct{} + // InitialScanOnlyUnsupportedOptions is options that are not supported with the // initial scan only option -var InitialScanOnlyUnsupportedOptions = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff, +var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff, OptMVCCTimestamps, OptUpdatedTimestamps) +// ParquetFormatUnsupportedOptions is options that are not supported with the +// parquet format. +// +// OptKeyInValue is disallowed because parquet files have no concept of key +// columns, so there is no reason to emit duplicate key datums. +// +// TODO(#103129): add support for some of these +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptResolvedTimestamps, OptDiff, + OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue) + // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. // TODO(sherman): At the moment we disallow altering both the initial_scan_only // and the end_time option. However, there are instances in which it should be // allowed to alter either of these options. We need to support the alteration // of these fields. -var AlterChangefeedUnsupportedOptions = makeStringSet(OptCursor, OptInitialScan, +var AlterChangefeedUnsupportedOptions OptionsSet = makeStringSet(OptCursor, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptEndTime) // AlterChangefeedOptionExpectValues is used to parse alter changefeed options @@ -1039,16 +1052,21 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool if err != nil { return err } - validateInitialScanUnsupportedOptions := func(errMsg string) error { - for o := range InitialScanOnlyUnsupportedOptions { + + // validateUnsupportedOptions returns an error if any of the supplied are + // in the statement options. The error string should be the string + // representation of the option (ex. "key_in_value", or "initial_scan='only'"). + validateUnsupportedOptions := func(unsupportedOptions OptionsSet, errorStr string) error { + for o := range unsupportedOptions { if _, ok := s.m[o]; ok { - return errors.Newf(`cannot specify both %s='only' and %s`, OptInitialScan, o) + return errors.Newf(`cannot specify both %s and %s`, errorStr, o) } } return nil } if scanType == OnlyInitialScan { - if err := validateInitialScanUnsupportedOptions(fmt.Sprintf("%s='only'", OptInitialScan)); err != nil { + if err := validateUnsupportedOptions(InitialScanOnlyUnsupportedOptions, + fmt.Sprintf("%s='only'", OptInitialScan)); err != nil { return err } } else { @@ -1058,17 +1076,8 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool } // Right now parquet does not support any of these options if s.m[OptFormat] == string(OptFormatParquet) { - if isPredicateChangefeed { - // Diff option is allowed when using predicate changefeeds with parquet format. - for o := range InitialScanOnlyUnsupportedOptions { - if _, ok := s.m[o]; ok && o != OptDiff { - return errors.Newf(`cannot specify both format='%s' and %s`, OptFormatParquet, o) - } - } - } else { - if err := validateInitialScanUnsupportedOptions(string(OptFormatParquet)); err != nil { - return err - } + if err := validateUnsupportedOptions(ParquetFormatUnsupportedOptions, fmt.Sprintf("format=%s", OptFormatParquet)); err != nil { + return err } } for o := range s.m { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options_test.go b/pkg/ccl/changefeedccl/changefeedbase/options_test.go index 524dec0c231b..4eb5c62a0dbc 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options_test.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options_test.go @@ -40,7 +40,6 @@ func TestOptionsValidations(t *testing.T) { {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, {map[string]string{"key_column": "b"}, false, "requires the unordered option"}, - {map[string]string{"diff": "", "format": "parquet"}, true, ""}, } for _, test := range tests { diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index d544088eb5be..e92ffbbafacc 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -1068,8 +1068,6 @@ func TestParquetEncoder(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - defer TestingSetIncludeParquetMetadata()() - tests := []struct { name string changefeedStmt string @@ -1089,22 +1087,22 @@ func TestParquetEncoder(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL)`) + sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x STRING, y INT, z FLOAT NOT NULL, a BOOL, c INT[])`) defer sqlDB.Exec(t, `DROP TABLE FOO`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true), (2, 'Bob', - 2, CAST('nan' AS FLOAT),false),(3, NULL, NULL, 4.5, NULL)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'Alice', 3, 0.5032135844230652, true, ARRAY[]), (2, 'Bob', + 2, CAST('nan' AS FLOAT),false, NULL),(3, NULL, NULL, 4.5, NULL, ARRAY[1,NULL,3])`) foo := feed(t, f, test.changefeedStmt) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": true, "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`, - `foo: [2]->{"after": {"a": false, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`, - `foo: [3]->{"after": {"a": null, "i": 3, "x": null, "y": null, "z": 4.5}}`, + `foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "Alice", "y": 3, "z": 0.5032135844230652}}`, + `foo: [2]->{"after": {"a": false, "c": null, "i": 2, "x": "Bob", "y": 2, "z": "NaN"}}`, + `foo: [3]->{"after": {"a": null, "c": [1, null, 3], "i": 3, "x": null, "y": null, "z": 4.5}}`, }) sqlDB.Exec(t, `UPDATE foo SET x='wonderland' where i=1`) assertPayloads(t, foo, []string{ - `foo: [1]->{"after": {"a": true, "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`, + `foo: [1]->{"after": {"a": true, "c": [], "i": 1, "x": "wonderland", "y": 3, "z": 0.5032135844230652}}`, }) sqlDB.Exec(t, `DELETE from foo where i=1`) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index ef9939a22361..49eeba5161cd 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -936,7 +936,6 @@ func makeFeedFactoryWithOptions( userDB, cleanup := getInitialDBForEnterpriseFactory(t, s, db, options) f.(*cloudFeedFactory).configureUserDB(userDB) return f, func() { - TestingSetIncludeParquetMetadata()() cleanup() } case "enterprise": @@ -1081,12 +1080,20 @@ func cdcTestNamedWithSystem( testLabel = fmt.Sprintf("%s/%s", sinkType, name) } t.Run(testLabel, func(t *testing.T) { + // Even if the parquet format is not being used, enable metadata + // in all tests for simplicity. testServer, cleanupServer := makeServerWithOptions(t, options) + knobs := testServer.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + knobs.EnableParquetMetadata = true + feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options) feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t) defer cleanupServer() defer cleanupSink() defer cleanupCloudStorage() + testFn(t, testServer, feedFactory) }) } @@ -1277,15 +1284,6 @@ func waitForJobStatus( }) } -// TestingSetIncludeParquetMetadata adds the option to turn on adding metadata -// to the parquet file which is used in testing. -func TestingSetIncludeParquetMetadata() func() { - includeParquetTestMetadata = true - return func() { - includeParquetTestMetadata = false - } -} - // ChangefeedJobPermissionsTestSetup creates entities and users with various permissions // for tests which test access control for changefeed jobs. // diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 4caeeef2efcc..3fc98aa8d5c7 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -10,6 +10,7 @@ package changefeedccl import ( "io" + "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -22,13 +23,13 @@ type parquetWriter struct { datumAlloc []tree.Datum } -// newParquetWriterFromRow constructs a new parquet writer which outputs to -// the given sink. This function interprets the schema from the supplied row. -func newParquetWriterFromRow( - row cdcevent.Row, sink io.Writer, opts ...parquet.Option, -) (*parquetWriter, error) { - columnNames := make([]string, len(row.ResultColumns())+1) - columnTypes := make([]*types.T, len(row.ResultColumns())+1) +// newParquetSchemaDefintion returns a parquet schema definition based on the +// cdcevent.Row and the number of cols in the schema. +func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) { + numCols := len(row.ResultColumns()) + 1 + + columnNames := make([]string, numCols) + columnTypes := make([]*types.T, numCols) idx := 0 if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { @@ -37,19 +38,41 @@ func newParquetWriterFromRow( idx += 1 return nil }); err != nil { - return nil, err + return nil, 0, err } columnNames[idx] = parquetCrdbEventTypeColName columnTypes[idx] = types.String schemaDef, err := parquet.NewSchema(columnNames, columnTypes) + if err != nil { + return nil, 0, err + } + return schemaDef, numCols, nil +} + +// newParquetWriterFromRow constructs a new parquet writer which outputs to +// the given sink. This function interprets the schema from the supplied row. +func newParquetWriterFromRow( + row cdcevent.Row, + sink io.Writer, + knobs *TestingKnobs, /* may be nil */ + opts ...parquet.Option, +) (*parquetWriter, error) { + schemaDef, numCols, err := newParquetSchemaDefintion(row) if err != nil { return nil, err } writerConstructor := parquet.NewWriter - if includeParquetTestMetadata { + + if knobs.EnableParquetMetadata { + if opts, err = addParquetTestMetadata(row, opts); err != nil { + return nil, err + } + + // To use parquet test utils for reading datums, the writer needs to be + // configured with additional metadata. writerConstructor = parquet.NewWriterWithReaderMeta } @@ -57,7 +80,7 @@ func newParquetWriterFromRow( if err != nil { return nil, err } - return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, len(columnNames))}, nil + return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil } // addData writes the updatedRow, adding the row's event type. There is no guarantee @@ -70,7 +93,7 @@ func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) e return w.inner.AddRow(w.datumAlloc) } -// Close closes the writer and flushes any buffered data to the sink. +// close closes the writer and flushes any buffered data to the sink. func (w *parquetWriter) close() error { return w.inner.Close() } @@ -88,3 +111,27 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) return nil } + +// addParquetTestMetadata appends options to the provided options to configure the +// parquet writer to write metadata required by cdc test feed factories. +func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) { + keyCols := make([]string, 0) + if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error { + keyCols = append(keyCols, col.Name) + return nil + }); err != nil { + return opts, err + } + opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")})) + + allCols := make([]string, 0) + if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { + allCols = append(allCols, col.Name) + return nil + }); err != nil { + return opts, err + } + allCols = append(allCols, parquetCrdbEventTypeColName) + opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")})) + return opts, nil +} diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 82e63e671aa9..78401efaff29 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -9,27 +9,16 @@ package changefeedccl import ( - "bytes" "context" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" - pqexporter "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" - goparquet "github.com/fraugster/parquet-go" - "github.com/fraugster/parquet-go/parquet" - "github.com/fraugster/parquet-go/parquetschema" ) -// This variable controls whether we add primary keys of the table to the -// metadata of the parquet file. Currently, this will be true only under -// testing. -// TODO(cdc): We should consider including this metadata during production also -var includeParquetTestMetadata = false - // This is an extra column that will be added to every parquet file which tells // us about the type of event that generated a particular row. The types are // defined below. @@ -54,8 +43,8 @@ func (e parquetEventType) DString() *tree.DString { } // We need a separate sink for parquet format because the parquet encoder has to -// write metadata to the parquet file (buffer) after each flush. This means that the -// parquet encoder should have access to the buffer object inside +// write metadata to the parquet file (buffer) after each flush. This means that +// the parquet encoder should have access to the buffer object inside // cloudStorageSinkFile file. This means that the parquet writer has to be // embedded in the cloudStorageSinkFile file. If we wanted to maintain the // existing separation between encoder and the sync, then we would need to @@ -75,24 +64,26 @@ type parquetCloudStorageSink struct { compression parquet.CompressionCodec } -type parquetFileWriter struct { - parquetWriter *goparquet.FileWriter - schema *parquetschema.SchemaDefinition - parquetColumns []pqexporter.ParquetColumn - numCols int -} - func makeParquetCloudStorageSink( baseCloudStorageSink *cloudStorageSink, ) (*parquetCloudStorageSink, error) { - parquetSink := &parquetCloudStorageSink{wrapped: baseCloudStorageSink} - if !baseCloudStorageSink.compression.enabled() { - parquetSink.compression = parquet.CompressionCodec_UNCOMPRESSED - } else if baseCloudStorageSink.compression == sinkCompressionGzip { - parquetSink.compression = parquet.CompressionCodec_GZIP - } else { - return nil, errors.AssertionFailedf("Specified compression not supported with parquet") + parquetSink := &parquetCloudStorageSink{ + wrapped: baseCloudStorageSink, + compression: parquet.CompressionNone, } + if baseCloudStorageSink.compression.enabled() { + switch baseCloudStorageSink.compression { + case sinkCompressionGzip: + parquetSink.compression = parquet.CompressionGZIP + case sinkCompressionZstd: + parquetSink.compression = parquet.CompressionZSTD + default: + return nil, errors.AssertionFailedf( + "unexpected compression codec %s", baseCloudStorageSink.compression, + ) + } + } + return parquetSink, nil } @@ -110,7 +101,7 @@ func (parquetSink *parquetCloudStorageSink) EmitRow( updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, ) error { - return errors.AssertionFailedf("Emit Row should not be called for parquet format") + return errors.AssertionFailedf("EmitRow unimplemented by the parquet cloud storage sink") } // Close implements the Sink interface. @@ -157,50 +148,22 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if file.parquetCodec == nil { var err error - file.parquetCodec, err = makeParquetWriterWrapper(ctx, updatedRow, &file.buf, parquetSink.compression) + file.parquetCodec, err = newParquetWriterFromRow( + updatedRow, &file.buf, parquetSink.wrapped.testingKnobs, + parquet.WithCompressionCodec(parquetSink.compression)) if err != nil { return err } } - colOrd := -1 - // TODO (ganeshb): Avoid map allocation on every call to emit row - parquetRow := make(map[string]interface{}, file.parquetCodec.numCols) - if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { - colOrd++ - // Omit NULL columns from parquet row - if d == tree.DNull { - parquetRow[col.Name] = nil - return nil - } - encodeFn, err := file.parquetCodec.parquetColumns[colOrd].GetEncoder() - if err != nil { - return err - } - edNative, err := encodeFn(d) - if err != nil { - return err - } - - parquetRow[col.Name] = edNative - - return nil - - }); err != nil { - return err - } - - et := getEventTypeDatum(updatedRow, prevRow) - parquetRow[parquetCrdbEventTypeColName] = []byte(et.DString().String()) - - if err = file.parquetCodec.parquetWriter.AddData(parquetRow); err != nil { + if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil { return err } - if file.parquetCodec.parquetWriter.CurrentRowGroupSize() > s.targetMaxFileSize { + if int64(file.buf.Len()) > s.targetMaxFileSize { s.metrics.recordSizeBasedFlush() - if err = file.parquetCodec.parquetWriter.Close(); err != nil { + if err = file.parquetCodec.close(); err != nil { return err } if err := s.flushTopicVersions(ctx, file.topic, file.schemaID); err != nil { @@ -219,106 +182,3 @@ func getEventTypeDatum(updatedRow cdcevent.Row, prevRow cdcevent.Row) parquetEve } return parquetEventInsert } - -func makeParquetWriterWrapper( - ctx context.Context, row cdcevent.Row, buf *bytes.Buffer, compression parquet.CompressionCodec, -) (*parquetFileWriter, error) { - parquetColumns, err := getParquetColumnTypes(ctx, row) - if err != nil { - return nil, err - } - - schema := pqexporter.NewParquetSchema(parquetColumns) - - parquetWriterOptions := make([]goparquet.FileWriterOption, 0) - - // TODO(cdc): We really should revisit if we should include any metadata in - // parquet files. There are plenty things we can include there, including crdb - // native column types, OIDs for those column types, etc - if includeParquetTestMetadata { - metadata, err := getMetadataForParquetFile(ctx, row) - if err != nil { - return nil, err - } - parquetWriterOptions = append(parquetWriterOptions, goparquet.WithMetaData(metadata)) - } - - // TODO(cdc): Determine if we should parquet's builtin compressor or rely on - // sinks compressing. Currently using not parquets builtin compressor, relying - // on sinks compression - parquetWriterOptions = append(parquetWriterOptions, goparquet.WithSchemaDefinition(schema)) - parquetWriterOptions = append(parquetWriterOptions, goparquet.WithCompressionCodec(compression)) - pqw := goparquet.NewFileWriter(buf, - parquetWriterOptions..., - ) - - pqww := &parquetFileWriter{ - pqw, - schema, - parquetColumns, - len(parquetColumns), - } - return pqww, nil -} - -func getMetadataForParquetFile(ctx context.Context, row cdcevent.Row) (map[string]string, error) { - metadata := make(map[string]string) - primaryKeyColNames := "" - columnNames := "" - if err := row.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { - primaryKeyColNames += col.Name + "," - return nil - }); err != nil { - return nil, err - } - metadata["primaryKeyNames"] = primaryKeyColNames - if err := row.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { - columnNames += col.Name + "," - return nil - }); err != nil { - return nil, err - } - metadata["columnNames"] = columnNames - return metadata, nil -} - -func getParquetColumnTypes( - ctx context.Context, row cdcevent.Row, -) ([]pqexporter.ParquetColumn, error) { - typs := make([]*types.T, 0) - names := make([]string, 0) - - if err := row.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { - typs = append(typs, col.Typ) - names = append(names, col.Name) - return nil - }); err != nil { - return nil, err - } - - parquetColumns := make([]pqexporter.ParquetColumn, len(typs)+1) - const nullable = true - - for i := 0; i < len(typs); i++ { - // Make every field optional, so that all schema evolutions for a table are - // considered "backward compatible" by parquet. This means that the parquet - // type doesn't mirror the column's nullability, but it makes it much easier - // to work with long histories of table data afterward, especially for - // things like loading into analytics databases. - parquetCol, err := pqexporter.NewParquetColumn(typs[i], names[i], nullable) - if err != nil { - return nil, err - } - parquetColumns[i] = parquetCol - } - - // Add the extra column which will store the type of event that generated that - // particular row. - var err error - parquetColumns[len(typs)], err = pqexporter.NewParquetColumn(types.String, parquetCrdbEventTypeColName, false) - if err != nil { - return nil, err - } - - return parquetColumns, nil -} diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 98a54b658bc1..d32f06655608 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -38,8 +38,6 @@ func TestParquetRows(t *testing.T) { // Rangefeed reader can time out under stress. skip.UnderStress(t) - defer TestingSetIncludeParquetMetadata()() - ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ // TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs. @@ -112,7 +110,8 @@ func TestParquetRows(t *testing.T) { require.NoError(t, err) if writer == nil { - writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize)) + writer, err = newParquetWriterFromRow(updatedRow, f, &TestingKnobs{EnableParquetMetadata: true}, parquet.WithMaxRowGroupLength(maxRowGroupSize), + parquet.WithCompressionCodec(parquet.CompressionGZIP)) if err != nil { t.Fatalf(err.Error()) } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5c15ceca655d..6c14c8bf21d6 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -261,6 +261,11 @@ func getSink( } case isCloudStorageSink(u): return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) { + var testingKnobs *TestingKnobs + if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok { + testingKnobs = knobs + } + // Placeholder id for canary sink var nodeID base.SQLInstanceID = 0 if serverCfg.NodeID != nil { @@ -268,7 +273,7 @@ func getSink( } return makeCloudStorageSink( ctx, sinkURL{URL: u}, nodeID, serverCfg.Settings, encodingOpts, - timestampOracle, serverCfg.ExternalStorageFromURI, user, metricsBuilder, + timestampOracle, serverCfg.ExternalStorageFromURI, user, metricsBuilder, testingKnobs, ) }) case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL: diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 67744d2e8e47..a3ca5bb532a2 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -72,7 +72,7 @@ type cloudStorageSinkFile struct { buf bytes.Buffer alloc kvevent.Alloc oldestMVCC hlc.Timestamp - parquetCodec *parquetFileWriter + parquetCodec *parquetWriter } var _ io.Writer = &cloudStorageSinkFile{} @@ -280,8 +280,10 @@ func (f *cloudStorageSinkFile) Write(p []byte) (int, error) { // job (call it P). Now, we're back to the case where k = 2 with jobs P and Q. Thus, by // induction we have the required proof. type cloudStorageSink struct { - srcID base.SQLInstanceID - sinkID int64 + srcID base.SQLInstanceID + sinkID int64 + + // targetMaxFileSize is the max target file size in bytes. targetMaxFileSize int64 settings *cluster.Settings partitionFormat string @@ -314,6 +316,9 @@ type cloudStorageSink struct { asyncFlushCh chan flushRequest // channel for submitting flush requests. asyncFlushTermCh chan struct{} // channel closed by async flusher to indicate an error asyncFlushErr error // set by async flusher, prior to closing asyncFlushTermCh + + // testingKnobs may be nil if no knobs are set. + testingKnobs *TestingKnobs } type flushRequest struct { @@ -359,6 +364,7 @@ func makeCloudStorageSink( makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory, user username.SQLUsername, mb metricsRecorderBuilder, + testingKnobs *TestingKnobs, ) (Sink, error) { var targetMaxFileSize int64 = 16 << 20 // 16MB if fileSizeParam := u.consumeParam(changefeedbase.SinkParamFileSize); fileSizeParam != `` { @@ -399,6 +405,7 @@ func makeCloudStorageSink( flushGroup: ctxgroup.WithContext(ctx), asyncFlushCh: make(chan flushRequest, flushQueueDepth), asyncFlushTermCh: make(chan struct{}), + testingKnobs: testingKnobs, } s.flushGroup.GoCtx(s.asyncFlusher) @@ -688,11 +695,13 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink } s.asyncFlushActive = asyncFlushEnabled + // If using parquet, we need to finish off writing the entire file. + // Closing the parquet codec will append some metadata to the file. if file.parquetCodec != nil { - if err := file.parquetCodec.parquetWriter.Close(); err != nil { + if err := file.parquetCodec.close(); err != nil { return err } - file.rawSize = len(file.buf.Bytes()) + file.rawSize = file.buf.Len() } // We use this monotonically increasing fileID to ensure correct ordering // among files emitted at the same timestamp during the same job session. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 160e648efa30..c3a0430ae8f2 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -211,8 +211,8 @@ func TestCloudStorageSink(t *testing.T) { timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} s, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, settings, - opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -263,8 +263,8 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} s, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, settings, - opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -340,14 +340,14 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} s1, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1.Close()) }() s2, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 2, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 2, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) defer func() { require.NoError(t, s2.Close()) }() require.NoError(t, err) @@ -376,14 +376,14 @@ func TestCloudStorageSink(t *testing.T) { // this happens before checkpointing, some data is written again but // this is unavoidable. s1R, err := makeCloudStorageSink( - ctx, sinkURI(t, unbuffered), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unbuffered), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1R.Close()) }() s2R, err := makeCloudStorageSink( - ctx, sinkURI(t, unbuffered), 2, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unbuffered), 2, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s2R.Close()) }() @@ -424,16 +424,16 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} s1, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s1.Close()) }() s1.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. s1.(*cloudStorageSink).jobSessionID = "a" // Force deterministic job session ID. s2, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s2.Close()) }() @@ -471,8 +471,8 @@ func TestCloudStorageSink(t *testing.T) { timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} const targetMaxFileSize = 6 s, err := makeCloudStorageSink( - ctx, sinkURI(t, targetMaxFileSize), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -612,8 +612,8 @@ func TestCloudStorageSink(t *testing.T) { sinkURIWithParam.addParam(changefeedbase.SinkParamPartitionFormat, tc.format) t.Logf("format=%s sinkgWithParam: %s", tc.format, sinkURIWithParam.String()) s, err := makeCloudStorageSink( - ctx, sinkURIWithParam, 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURIWithParam, 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) @@ -645,8 +645,8 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, err) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} s, err := makeCloudStorageSink( - ctx, sinkURI(t, unlimitedFileSize), 1, - settings, opts, timestampOracle, externalStorageFromURI, user, nil, + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil, ) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() @@ -705,8 +705,8 @@ func TestCloudStorageSink(t *testing.T) { timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} var targetMaxFileSize int64 = 10 s, err := makeCloudStorageSink( - ctx, sinkURI(t, targetMaxFileSize), 1, settings, - opts, timestampOracle, externalStorageFromURI, user, nil) + ctx, sinkURI(t, targetMaxFileSize), 1, settings, opts, + timestampOracle, externalStorageFromURI, user, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, s.Close()) }() diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2c83d846bafc..855de6637277 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -16,8 +16,6 @@ import ( "encoding/base64" gojson "encoding/json" "fmt" - "io" - "math" "math/rand" "net/url" "os" @@ -46,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -57,11 +56,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - goparquet "github.com/fraugster/parquet-go" "github.com/jackc/pgx/v4" "google.golang.org/api/option" "google.golang.org/grpc" @@ -1059,9 +1058,9 @@ func (f *cloudFeedFactory) Feed( tree.KVOption{Key: changefeedbase.OptKeyInValue}, ) } - // Determine if we can enable parquet output if the changefeed is compatible - // with parquet format, if no format is specified. If it is, we will use - // parquet format with a probability of 0.4. The rest of the time json is used + // Determine if we can enable the parquet format if the changefeed is not + // being created with incompatible options. If it can be enabled, we will use + // parquet format with a probability of 0.4. parquetPossible := true explicitEnvelope := false @@ -1070,11 +1069,12 @@ func (f *cloudFeedFactory) Feed( explicitEnvelope = true } - if string(opt.Key) == changefeedbase.OptFormat { + if string(opt.Key) == changefeedbase.OptFormat && + opt.Value.String() != string(changefeedbase.OptFormatParquet) { parquetPossible = false break } - for o := range changefeedbase.InitialScanOnlyUnsupportedOptions { + for o := range changefeedbase.ParquetFormatUnsupportedOptions { if o == string(opt.Key) { parquetPossible = false break @@ -1082,12 +1082,11 @@ func (f *cloudFeedFactory) Feed( } } randNum := rand.Intn(5) - if randNum < 3 { + if randNum < 2 { parquetPossible = false } if parquetPossible { - TestingSetIncludeParquetMetadata() - log.Infof(context.Background(), "Using parquet format") + log.Infof(context.Background(), "using parquet format") createStmt.Options = append( createStmt.Options, tree.KVOption{ @@ -1228,199 +1227,133 @@ func extractKeyFromJSONValue(isBare bool, wrapped []byte) (key []byte, value []b return key, value, nil } -func (c *cloudFeed) decodeParquetValueAsJSON(value interface{}) (interface{}, error) { - switch vv := value.(type) { - case []byte: - // Currently, for encoding from CRDB data type to Parquet data type, for - // any complex structure (that is, other than ints and floats), it is - // always a byte array which is the string representation of that datum - // (except for arrays, see below). Therefore, if the parquet reader - // decodes a column value as a Go native byte array, then we can be sure - // that it is the equivalent string representation of the CRDB datum. - // Hence, we can use this value to construct the final JSON object which - // will be used by assertPayload to compare actual and expected JSON - // objects. - - // Ideally there should be no need to convert byte array to string but - // JSON encoder will encode byte arrays as base 64 encoded strings. - // Hence, we need to convert to string to tell Marshal to decode it as a - // string. For every other Go native type, we can use the type as is and - // json.Marhsal will work correctly. - return string(vv), nil - case map[string]interface{}: - // This is CRDB ARRAY data type (only data type for which we use parquet - // LIST logical type. For all other CRDB types, it's either a byte array - // or a primitive parquet type). See importer.NewParquetColumn for - // details on how CRDB Array is encoded to parquet. (read it before - // reading the rest of the comments). Ideally, the parquet reader should - // decode the encoded CRDB array datum as a go native list type. But we - // use a low level API provided by the vendor which decodes parquet's - // LIST logical data type - // (https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) - // into this weird map structure in Go (It actually makes a lot of sense - // why this is done if you understand the parquet LIST logical data - // type). A higher level API would convert this map data structure into - // go native list type which is what the code below does. This would - // probably need to be changed if the parquet vendor is changed. - - // TODO(ganeshb): Make sure that the library is indeed decoding parquet lists - // into this weird map format and it is not because of the way we encode - vtemp := make([]interface{}, 0) - if castedValue, ok := vv["list"].([]map[string]interface{}); ok { - for _, ele := range castedValue { - if elementVal, ok := ele["element"]; ok { - if byteTypeElement, ok := elementVal.([]byte); ok { - vtemp = append(vtemp, string(byteTypeElement)) - } else { - // Primitive types - vtemp = append(vtemp, ele["element"]) - } - } else { - return nil, errors.Errorf("Data structure returned by parquet vendor for CRDB ARRAY type is not as expected.") - } - } - } else { - return nil, errors.Errorf("Data structure returned by parquet vendor for CRDB ARRAY type is not as expected.") - } - return vtemp, nil - default: - // int's, float's and other primitive types - if floatVal, ok := vv.(float64); ok { - // gojson cannot encode NaN values - // https://github.com/golang/go/issues/25721 - if math.IsNaN(floatVal) { - return "NaN", nil - } - } - return vv, nil - } -} - -// appendParquetTestFeedMessages function reads the parquet file and converts each row to its JSON -// equivalent and appends it to the cloudfeed's row object. +// appendParquetTestFeedMessages function reads the parquet file and converts +// each row to its JSON equivalent and appends it to the cloudfeed's row object. func (c *cloudFeed) appendParquetTestFeedMessages( path string, topic string, envelopeType changefeedbase.EnvelopeType, -) error { - f, err := os.Open(path) +) (err error) { + meta, datums, closeReader, err := parquet.ReadFile(path) if err != nil { return err } - defer f.Close() + defer func() { + closeErr := closeReader() + if closeErr != nil { + err = errors.CombineErrors(err, closeErr) + } + }() - fr, err := goparquet.NewFileReader(f) - if err != nil { - return err + primaryKeyColumnsString := meta.KeyValueMetadata().FindValue("keyCols") + if primaryKeyColumnsString == nil { + return errors.Errorf("could not find primary key column names in parquet metadata") } - primaryKeyColumnsString, ok := fr.MetaData()["primaryKeyNames"] - if !ok { - return errors.Errorf("Did not find primary key column names in metadata of parquet file during testing") + columnsNamesString := meta.KeyValueMetadata().FindValue("allCols") + if columnsNamesString == nil { + return errors.Errorf("could not find column names in parquet metadata") } - columnsNamesString, ok := fr.MetaData()["columnNames"] - if !ok { - return errors.Errorf("Did not find column names in metadata of parquet file during testing") - } - columns := strings.Split(columnsNamesString, ",") - primaryKeys := strings.Split(primaryKeyColumnsString, ",") + primaryKeys := strings.Split(*primaryKeyColumnsString, ",") + columns := strings.Split(*columnsNamesString, ",") columnNameSet := make(map[string]struct{}) primaryKeyColumnSet := make(map[string]struct{}) - for _, key := range primaryKeys[:len(primaryKeys)-1] { + for _, key := range primaryKeys { primaryKeyColumnSet[key] = struct{}{} } + // Drop parquetCrdbEventTypeColName. for _, key := range columns[:len(columns)-1] { columnNameSet[key] = struct{}{} } - for { - row, err := fr.NextRow() - if err == io.EOF { - break - } + for _, row := range datums { + isDeleted := false + // Remove parquetCrdbEventTypeColName from the column names list. Values + // for this column will still be present in datum rows. + rowCopy := make([]string, len(columns)-1) + copy(rowCopy, columns[:len(columns)-1]) + rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy) if err != nil { return err } - // Holds column to its value mapping for a row and the entire thing will be - // JSON encoded later. - value := make(map[string]interface{}) - - // Holds the mapping of primary keys and its values. - key := make(map[string]interface{}, 0) - isDeleted := false - - for k, v := range row { + keyJSONBuilder := json.NewArrayBuilder(len(primaryKeys)) + for colIdx, v := range row { + k := columns[colIdx] if k == parquetCrdbEventTypeColName { - if string(v.([]byte)) == parquetEventDelete.DString().String() { + if *v.(*tree.DString) == *parquetEventDelete.DString() { isDeleted = true } continue } if _, ok := columnNameSet[k]; ok { - value[k], err = c.decodeParquetValueAsJSON(v) + j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } + if err := rowJSONBuilder.Set(k, j); err != nil { + return err + } } if _, ok := primaryKeyColumnSet[k]; ok { - decodedKeyVal, err := c.decodeParquetValueAsJSON(v) + j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - key[k] = decodedKeyVal - } - } - - if !isDeleted { - for col := range columnNameSet { - if _, ok := value[col]; !ok { - value[col] = nil - } - } - } else { - for k := range primaryKeyColumnSet { - delete(value, k) + keyJSONBuilder.Add(j) } } - valueWithAfter := make(map[string]interface{}) + var valueWithAfter *json.FixedKeysObjectBuilder if envelopeType == changefeedbase.OptEnvelopeBare { - valueWithAfter = value + valueWithAfter = rowJSONBuilder } else { + valueWithAfter, err = json.NewFixedKeysObjectBuilder([]string{"after"}) + if err != nil { + return err + } if isDeleted { - valueWithAfter["after"] = nil + nullJSON, err := tree.AsJSON(tree.DNull, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + if err = valueWithAfter.Set("after", nullJSON); err != nil { + return err + } } else { - valueWithAfter["after"] = value + vbJson, err := rowJSONBuilder.Build() + if err != nil { + return err + } + if err = valueWithAfter.Set("after", vbJson); err != nil { + return err + } } } - orderedKey := make([]interface{}, 0) - for _, k := range primaryKeys[:len(primaryKeys)-1] { - orderedKey = append(orderedKey, key[k]) - } + keyJSON := keyJSONBuilder.Build() - // Sorts the keys - jsonValue, err := reformatJSON(valueWithAfter) + rowJSON, err := valueWithAfter.Build() if err != nil { return err } - jsonKey, err := reformatJSON(orderedKey) - if err != nil { - return err - } + var keyBuf bytes.Buffer + keyJSON.Format(&keyBuf) + + var rowBuf bytes.Buffer + rowJSON.Format(&rowBuf) m := &cdctest.TestFeedMessage{ Topic: topic, - Value: jsonValue, - Key: jsonKey, + Value: rowBuf.Bytes(), + Key: keyBuf.Bytes(), } if isNew := c.markSeen(m); !isNew { diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index f50f775864db..d49815132ae1 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -74,6 +74,10 @@ type TestingKnobs struct { // OnDrain returns the channel to select on to detect node drain OnDrain func() <-chan struct{} + + // EnableParquetMetadata configures the parquet format writer to write + // metadata which is required for testing. + EnableParquetMetadata bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/util/parquet/schema.go b/pkg/util/parquet/schema.go index ea8caf10fd07..76721fe6d8cd 100644 --- a/pkg/util/parquet/schema.go +++ b/pkg/util/parquet/schema.go @@ -71,6 +71,9 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition, fields := make([]schema.Node, 0) for i := 0; i < len(columnNames); i++ { + if columnTypes[i] == nil { + return nil, errors.AssertionFailedf("column %s missing type information", columnNames[i]) + } parquetCol, err := makeColumn(columnNames[i], columnTypes[i], defaultRepetitions) if err != nil { return nil, err @@ -341,6 +344,10 @@ func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (c // and [] when encoding. // There is more info about encoding arrays here: // https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/ + if typ.ArrayContents().Family() == types.ArrayFamily { + return result, pgerror.Newf(pgcode.FeatureNotSupported, + "parquet writer does not support nested arrays") + } elementCol, err := makeColumn("element", typ.ArrayContents(), parquet.Repetitions.Optional) if err != nil { diff --git a/pkg/util/parquet/testutils.go b/pkg/util/parquet/testutils.go index 9123ce8c7919..d6877f3784ac 100644 --- a/pkg/util/parquet/testutils.go +++ b/pkg/util/parquet/testutils.go @@ -259,6 +259,7 @@ func readBatch[T parquetDatatypes]( continue } arrDatum := &tree.DArray{} + arrDatum.Array = tree.Datums{} result = append(result, arrDatum) // Replevel 0, Deflevel 1 represents an array which is empty. if defLevels[0] == 1 { From 63eccd4a513dfea440a2f7d6176d759e4c8a9eba Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 15 May 2023 15:20:02 -0400 Subject: [PATCH 2/2] do not merge: always force parquet format in cdc tests if possible Epic: none Release note: None --- pkg/ccl/changefeedccl/testfeed_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 855de6637277..6685dab5ccf6 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1082,7 +1082,7 @@ func (f *cloudFeedFactory) Feed( } } randNum := rand.Intn(5) - if randNum < 2 { + if randNum < 5 { parquetPossible = false } if parquetPossible {