diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 7671a1c4f881..c180f3344f40 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -122,6 +122,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/bitarray", "//pkg/util/bufalloc", + "//pkg/util/buildutil", "//pkg/util/cache", "//pkg/util/ctxgroup", "//pkg/util/duration", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e20522641e10..ef87dadc34c7 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6091,7 +6091,6 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { DistSQL: &execinfra.TestingKnobs{ DrainFast: true, Changefeed: &TestingKnobs{ - EnableParquetMetadata: true, // Filter out draining nodes; normally we rely on dist sql planner // to do that for us. FilterDrainingNodes: func( @@ -6282,10 +6281,8 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { DefaultTestTenant: base.TestTenantDisabled, Knobs: base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ - DrainFast: true, - Changefeed: &TestingKnobs{ - EnableParquetMetadata: true, - }, + DrainFast: true, + Changefeed: &TestingKnobs{}, }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 6be98c32925a..97ee75b2348e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // columns, so there is no reason to emit duplicate key datums. // // TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, - OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index f4994e01cda1..a447cbcc250f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp } type kvEventToRowConsumer struct { frontier - encoder Encoder - scratch bufalloc.ByteAllocator - sink EventSink - cursor hlc.Timestamp - knobs TestingKnobs - decoder cdcevent.Decoder - details ChangefeedConfig - evaluator *cdceval.Evaluator - encodingFormat changefeedbase.FormatType + encoder Encoder + scratch bufalloc.ByteAllocator + sink EventSink + cursor hlc.Timestamp + knobs TestingKnobs + decoder cdcevent.Decoder + details ChangefeedConfig + evaluator *cdceval.Evaluator + encodingOpts changefeedbase.EncodingOptions topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer @@ -256,7 +256,7 @@ func newKVEventToRowConsumer( topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, evaluator: evaluator, - encodingFormat: encodingOpts.Format, + encodingOpts: encodingOpts, metrics: metrics, pacer: pacer, }, nil @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit( } } - if c.encodingFormat == changefeedbase.OptFormatParquet { + if c.encodingOpts.Format == changefeedbase.OptFormatParquet { return c.encodeForParquet( - ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc, + ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, + c.encodingOpts, alloc, ) } var keyCopy, valueCopy []byte @@ -478,6 +479,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { sinkWithEncoder, ok := c.sink.(SinkWithEncoder) @@ -485,7 +487,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink) } if err := sinkWithEncoder.EncodeAndEmitRow( - ctx, updatedRow, prevRow, topic, updated, mvcc, alloc, + ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc, ); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index dfba5ca8dfb0..194f4011e234 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1085,11 +1085,6 @@ func cdcTestNamedWithSystem( // Even if the parquet format is not being used, enable metadata // in all tests for simplicity. testServer, cleanupServer := makeServerWithOptions(t, options) - knobs := testServer.TestingKnobs. - DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs) - knobs.EnableParquetMetadata = true - feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options) feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t) defer cleanupServer() diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index d3a8b1726321..9e50c8360adb 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -15,20 +15,31 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) +// includeParquestTestMetadata configures the parquet writer to write +// metadata required for reading parquet files in tests. +var includeParquestTestMetadata = buildutil.CrdbTestBuild + type parquetWriter struct { - inner *parquet.Writer - datumAlloc []tree.Datum + inner *parquet.Writer + encodingOpts changefeedbase.EncodingOptions + schemaDef *parquet.SchemaDefinition + datumAlloc []tree.Datum } // newParquetSchemaDefintion returns a parquet schema definition based on the // cdcevent.Row and the number of cols in the schema. -func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) { +func newParquetSchemaDefintion( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, +) (*parquet.SchemaDefinition, error) { var columnNames []string var columnTypes []*types.T @@ -39,18 +50,37 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int numCols += 1 return nil }); err != nil { - return nil, 0, err + return nil, err } columnNames = append(columnNames, parquetCrdbEventTypeColName) columnTypes = append(columnTypes, types.String) numCols += 1 + columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts) + schemaDef, err := parquet.NewSchema(columnNames, columnTypes) if err != nil { - return nil, 0, err + return nil, err + } + return schemaDef, nil +} + +const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps +const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps + +func appendMetadataColsToSchema( + columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions, +) (updatedNames []string, updatedTypes []*types.T) { + if encodingOpts.UpdatedTimestamps { + columnNames = append(columnNames, parquetOptUpdatedTimestampColName) + columnTypes = append(columnTypes, types.String) } - return schemaDef, numCols, nil + if encodingOpts.MVCCTimestamps { + columnNames = append(columnNames, parquetOptMVCCTimestampColName) + columnTypes = append(columnTypes, types.String) + } + return columnNames, columnTypes } // newParquetWriterFromRow constructs a new parquet writer which outputs to @@ -58,30 +88,37 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int func newParquetWriterFromRow( row cdcevent.Row, sink io.Writer, - knobs *TestingKnobs, /* may be nil */ + encodingOpts changefeedbase.EncodingOptions, opts ...parquet.Option, ) (*parquetWriter, error) { - schemaDef, numCols, err := newParquetSchemaDefintion(row) + schemaDef, err := newParquetSchemaDefintion(row, encodingOpts) if err != nil { return nil, err } - if knobs != nil && knobs.EnableParquetMetadata { - if opts, err = addParquetTestMetadata(row, opts); err != nil { + if includeParquestTestMetadata { + if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil { return nil, err } } - writer, err := newParquetWriter(schemaDef, sink, knobs, opts...) + writer, err := newParquetWriter(schemaDef, sink, opts...) if err != nil { return nil, err } - return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil + return &parquetWriter{ + inner: writer, + encodingOpts: encodingOpts, + schemaDef: schemaDef, + datumAlloc: make([]tree.Datum, schemaDef.NumColumns()), + }, nil } // addData writes the updatedRow, adding the row's event type. There is no guarantee // that data will be flushed after this function returns. -func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error { - if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil { +func (w *parquetWriter) addData( + updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, +) error { + if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil { return err } @@ -94,7 +131,13 @@ func (w *parquetWriter) close() error { } // populateDatums writes the appropriate datums into the datumAlloc slice. -func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { +func populateDatums( + updatedRow cdcevent.Row, + prevRow cdcevent.Row, + encodingOpts changefeedbase.EncodingOptions, + updated, mvcc hlc.Timestamp, + datumAlloc []tree.Datum, +) error { datums := datumAlloc[:0] if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { @@ -104,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] return err } datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) + + if encodingOpts.UpdatedTimestamps { + datums = append(datums, tree.NewDString(timestampToString(updated))) + } + if encodingOpts.MVCCTimestamps { + datums = append(datums, tree.NewDString(timestampToString(mvcc))) + } return nil } @@ -114,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] // `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value // columns in a JSON object. The metadata generated by this function contains // key and value column names along with their offsets in the parquet file. -func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) { +func addParquetTestMetadata( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option, +) ([]parquet.Option, error) { // NB: Order matters. When iterating using ForAllColumns, which is used when // writing datums and defining the schema, the order of columns usually // matches the underlying table. If a composite keys defined, the order in @@ -127,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. keysInOrder = append(keysInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // NB: We do not use ForAllColumns here because it will always contain the @@ -141,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. valuesInOrder = append(valuesInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // Iterate over ForAllColumns to determine the offets of each column @@ -160,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. idx += 1 return nil }); err != nil { - return opts, err + return parquetOpts, err } valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName) valueCols[parquetCrdbEventTypeColName] = idx idx += 1 - opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) - opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) - return opts, nil + if encodingOpts.UpdatedTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName) + valueCols[parquetOptUpdatedTimestampColName] = idx + idx += 1 + } + if encodingOpts.MVCCTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName) + valueCols[parquetOptMVCCTimestampColName] = idx + idx += 1 + } + + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) + return parquetOpts, nil } // serializeMap serializes a map to a string. For example, orderedKeys=["b", @@ -213,12 +276,9 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error // newParquetWriter allocates a new parquet writer using the provided // schema definition. func newParquetWriter( - sch *parquet.SchemaDefinition, - sink io.Writer, - knobs *TestingKnobs, /* may be nil */ - opts ...parquet.Option, + sch *parquet.SchemaDefinition, sink io.Writer, opts ...parquet.Option, ) (*parquet.Writer, error) { - if knobs != nil && knobs.EnableParquetMetadata { + if includeParquestTestMetadata { // To use parquet test utils for reading datums, the writer needs to be // configured with additional metadata. return parquet.NewWriterWithReaderMeta(sch, sink, opts...) diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 1ca96cccab84..007182be03a2 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -15,6 +15,7 @@ import ( "path/filepath" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" @@ -29,7 +30,7 @@ import ( // This is an extra column that will be added to every parquet file which tells // us about the type of event that generated a particular row. The types are // defined below. -const parquetCrdbEventTypeColName string = "__crdb__event_type__" +const parquetCrdbEventTypeColName string = metaSentinel + "event_type" type parquetEventType int @@ -147,7 +148,7 @@ func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp( // TODO: Ideally, we do not create a new schema and writer every time // we emit a resolved timestamp. Currently, util/parquet does not support it. - writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs) + writer, err := newParquetWriter(sch, &buf) if err != nil { return err } @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { s := parquetSink.wrapped @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if file.parquetCodec == nil { var err error file.parquetCodec, err = newParquetWriterFromRow( - updatedRow, &file.buf, parquetSink.wrapped.testingKnobs, + updatedRow, &file.buf, encodingOpts, parquet.WithCompressionCodec(parquetSink.compression)) if err != nil { return err } } - if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil { + if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 0f1772ea4a5d..a55fd3ca8839 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" @@ -120,8 +121,10 @@ func TestParquetRows(t *testing.T) { ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false) require.NoError(t, err) + encodingOpts := changefeedbase.EncodingOptions{} + if writer == nil { - writer, err = newParquetWriterFromRow(updatedRow, f, &TestingKnobs{EnableParquetMetadata: true}, parquet.WithMaxRowGroupLength(maxRowGroupSize), + writer, err = newParquetWriterFromRow(updatedRow, f, encodingOpts, parquet.WithMaxRowGroupLength(maxRowGroupSize), parquet.WithCompressionCodec(parquet.CompressionGZIP)) if err != nil { t.Fatalf(err.Error()) @@ -129,12 +132,12 @@ func TestParquetRows(t *testing.T) { numCols = len(updatedRow.ResultColumns()) + 1 } - err = writer.addData(updatedRow, prevRow) + err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{}) require.NoError(t, err) // Save a copy of the datums we wrote. - datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1) - err = populateDatums(updatedRow, prevRow, datumRow) + datumRow := make([]tree.Datum, writer.schemaDef.NumColumns()) + err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow) require.NoError(t, err) datums[i] = datumRow } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5269d6c213f4..0d730a835f4a 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -450,10 +450,11 @@ func (s errorWrapperSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped) } @@ -716,6 +717,7 @@ type SinkWithEncoder interface { prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index de6662574316..c5df6e32538d 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -624,10 +623,11 @@ func (s *notifyFlushSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.Sink.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.Sink) } @@ -1053,11 +1053,7 @@ func (f *cloudFeedFactory) Feed( // Determine if we can enable the parquet format if the changefeed is not // being created with incompatible options. If it can be enabled, we will use // parquet format with a probability of 0.4. - // - // TODO: Consider making this knob a global flag so tests that don't - // initialize testing knobs can use parquet metamorphically. - knobs := f.s.TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed - parquetPossible := knobs != nil && knobs.(*TestingKnobs).EnableParquetMetadata + parquetPossible := includeParquestTestMetadata explicitEnvelope := false for _, opt := range createStmt.Options { if string(opt.Key) == changefeedbase.OptEnvelope { @@ -1250,18 +1246,28 @@ func (c *cloudFeed) appendParquetTestFeedMessages( return err } - for _, row := range datums { - rowCopy := make([]string, len(valueColumnNamesOrdered)-1) - copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1]) - rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy) - if err != nil { - return err + // Extract metadata columns into metaColumnNameSet. + extractMetaColumns := func(columnNameSet map[string]int) map[string]int { + metaColumnNameSet := make(map[string]int) + for colName, colIdx := range columnNameSet { + switch colName { + case parquetCrdbEventTypeColName: + metaColumnNameSet[colName] = colIdx + case parquetOptUpdatedTimestampColName: + metaColumnNameSet[colName] = colIdx + case parquetOptMVCCTimestampColName: + metaColumnNameSet[colName] = colIdx + default: + } } + return metaColumnNameSet + } + metaColumnNameSet := extractMetaColumns(columnNameSet) + for _, row := range datums { + rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet)) keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered)) - isDeleted := false - for _, primaryKeyColumnName := range primaryKeysNamesOrdered { datum := row[primaryKeyColumnSet[primaryKeyColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) @@ -1272,28 +1278,26 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } for _, valueColumnName := range valueColumnNamesOrdered { - if valueColumnName == parquetCrdbEventTypeColName { - if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() { - isDeleted = true - } - break + if _, isMeta := metaColumnNameSet[valueColumnName]; isMeta { + continue } + datum := row[columnNameSet[valueColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err := rowJSONBuilder.Set(valueColumnName, j); err != nil { - return err - } + rowJSONBuilder.Add(valueColumnName, j) } - var valueWithAfter *json.FixedKeysObjectBuilder + var valueWithAfter *json.ObjectBuilder + + isDeleted := *(row[metaColumnNameSet[parquetCrdbEventTypeColName]].(*tree.DString)) == *parquetEventDelete.DString() if envelopeType == changefeedbase.OptEnvelopeBare { valueWithAfter = rowJSONBuilder } else { - valueWithAfter, err = json.NewFixedKeysObjectBuilder([]string{"after"}) + valueWithAfter = json.NewObjectBuilder(1) if err != nil { return err } @@ -1302,26 +1306,31 @@ func (c *cloudFeed) appendParquetTestFeedMessages( if err != nil { return err } - if err = valueWithAfter.Set("after", nullJSON); err != nil { - return err - } + valueWithAfter.Add("after", nullJSON) } else { - vbJson, err := rowJSONBuilder.Build() + vbJson := rowJSONBuilder.Build() + valueWithAfter.Add("after", vbJson) + } + + if updatedColIdx, updated := metaColumnNameSet[parquetOptUpdatedTimestampColName]; updated { + j, err := tree.AsJSON(row[updatedColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err = valueWithAfter.Set("after", vbJson); err != nil { + valueWithAfter.Add(changefeedbase.OptUpdatedTimestamps, j) + } + if mvccColIdx, mvcc := metaColumnNameSet[parquetOptMVCCTimestampColName]; mvcc { + j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { return err } + valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j) } } keyJSON := keyJSONBuilder.Build() - rowJSON, err := valueWithAfter.Build() - if err != nil { - return err - } + rowJSON := valueWithAfter.Build() var keyBuf bytes.Buffer keyJSON.Format(&keyBuf) diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index d49815132ae1..f50f775864db 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -74,10 +74,6 @@ type TestingKnobs struct { // OnDrain returns the channel to select on to detect node drain OnDrain func() <-chan struct{} - - // EnableParquetMetadata configures the parquet format writer to write - // metadata which is required for testing. - EnableParquetMetadata bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/util/parquet/schema.go b/pkg/util/parquet/schema.go index 5ffb4e650b5a..920550b9ee18 100644 --- a/pkg/util/parquet/schema.go +++ b/pkg/util/parquet/schema.go @@ -72,6 +72,10 @@ type SchemaDefinition struct { schema *schema.Schema } +func (sd *SchemaDefinition) NumColumns() int { + return len(sd.cols) +} + // NewSchema generates a SchemaDefinition. // // Columns in the returned SchemaDefinition will match the order they appear in