Skip to content

Commit

Permalink
Merge #104407
Browse files Browse the repository at this point in the history
104407: changefeedccl: support the updated and mvcc with parquet format r=miretskiy a=jayshrivastava

#### changefeedccl: use buildutil.CrdbTestBuild for parquet testing

Parquet testing requires that extra metadata be written to parquet files
so tests can create CDC rows from the raw data. Previously, the production
parquet code relied on a testing knob to be passed to write this extra
metadata. This is problematic as not all tests would pass the testing knob,
making it so that we could not randomly use parquet in those tests for
metamorphic testing purposes. With this change, the parquet production code
uses `buildutil.CrdbTestBuild`, which is a global flag enabled in tests.
Now, metamorphic parquet testing can be applied to more tests.

Epic: None
Release note: None

---

#### changefeedccl: support the updated and mvcc timestamp options with parquet format

Previously, using the parquet format in changefeeds did not support
using the `mvcc` or `updated` options. This change adds support for using
these options with parquet.

Epic: None
Release note: None

---

#### do not merge: force parquet when possible during tests

This change will not be merged. When it is possible for tests
to use parquet, this change forces them to use parquet.

This change also forces tests for updated timestamps and mvcc
timestamps to use the cloudstorage sink + parquet format.

Release note: None
Epic: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed Jun 7, 2023
2 parents 473592f + 3045e41 commit af78952
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 99 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/cache",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6091,7 +6091,6 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
// Filter out draining nodes; normally we rely on dist sql planner
// to do that for us.
FilterDrainingNodes: func(
Expand Down Expand Up @@ -6282,10 +6281,8 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
},
DrainFast: true,
Changefeed: &TestingKnobs{},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
28 changes: 15 additions & 13 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp }

type kvEventToRowConsumer struct {
frontier
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingFormat changefeedbase.FormatType
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingOpts changefeedbase.EncodingOptions

topicDescriptorCache map[TopicIdentifier]TopicDescriptor
topicNamer *TopicNamer
Expand Down Expand Up @@ -256,7 +256,7 @@ func newKVEventToRowConsumer(
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
topicNamer: topicNamer,
evaluator: evaluator,
encodingFormat: encodingOpts.Format,
encodingOpts: encodingOpts,
metrics: metrics,
pacer: pacer,
}, nil
Expand Down Expand Up @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
}
}

if c.encodingFormat == changefeedbase.OptFormatParquet {
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
return c.encodeForParquet(
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc,
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
c.encodingOpts, alloc,
)
}
var keyCopy, valueCopy []byte
Expand Down Expand Up @@ -478,14 +479,15 @@ func (c *kvEventToRowConsumer) encodeForParquet(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
sinkWithEncoder, ok := c.sink.(SinkWithEncoder)
if !ok {
return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink)
}
if err := sinkWithEncoder.EncodeAndEmitRow(
ctx, updatedRow, prevRow, topic, updated, mvcc, alloc,
ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc,
); err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,11 +1085,6 @@ func cdcTestNamedWithSystem(
// Even if the parquet format is not being used, enable metadata
// in all tests for simplicity.
testServer, cleanupServer := makeServerWithOptions(t, options)
knobs := testServer.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.EnableParquetMetadata = true

feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options)
feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t)
defer cleanupServer()
Expand Down
114 changes: 87 additions & 27 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,31 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

// includeParquestTestMetadata configures the parquet writer to write
// metadata required for reading parquet files in tests.
var includeParquestTestMetadata = buildutil.CrdbTestBuild

type parquetWriter struct {
inner *parquet.Writer
datumAlloc []tree.Datum
inner *parquet.Writer
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
func newParquetSchemaDefintion(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions,
) (*parquet.SchemaDefinition, error) {
var columnNames []string
var columnTypes []*types.T

Expand All @@ -39,49 +50,75 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int
numCols += 1
return nil
}); err != nil {
return nil, 0, err
return nil, err
}

columnNames = append(columnNames, parquetCrdbEventTypeColName)
columnTypes = append(columnTypes, types.String)
numCols += 1

columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts)

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
return nil, err
}
return schemaDef, nil
}

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
) (updatedNames []string, updatedTypes []*types.T) {
if encodingOpts.UpdatedTimestamps {
columnNames = append(columnNames, parquetOptUpdatedTimestampColName)
columnTypes = append(columnTypes, types.String)
}
return schemaDef, numCols, nil
if encodingOpts.MVCCTimestamps {
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
return columnNames, columnTypes
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
encodingOpts changefeedbase.EncodingOptions,
opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
schemaDef, err := newParquetSchemaDefintion(row, encodingOpts)
if err != nil {
return nil, err
}

if knobs != nil && knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
if includeParquestTestMetadata {
if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil {
return nil, err
}
}
writer, err := newParquetWriter(schemaDef, sink, knobs, opts...)
writer, err := newParquetWriter(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
return &parquetWriter{
inner: writer,
encodingOpts: encodingOpts,
schemaDef: schemaDef,
datumAlloc: make([]tree.Datum, schemaDef.NumColumns()),
}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
// that data will be flushed after this function returns.
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil {
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
return err
}

Expand All @@ -94,7 +131,13 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
) error {
datums := datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
Expand All @@ -104,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
return err
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
return nil
}

Expand All @@ -114,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value
// columns in a JSON object. The metadata generated by this function contains
// key and value column names along with their offsets in the parquet file.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
func addParquetTestMetadata(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option,
) ([]parquet.Option, error) {
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
Expand All @@ -127,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
keysInOrder = append(keysInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// NB: We do not use ForAllColumns here because it will always contain the
Expand All @@ -141,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
valuesInOrder = append(valuesInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// Iterate over ForAllColumns to determine the offets of each column
Expand All @@ -160,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
idx += 1
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}
valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName)
valueCols[parquetCrdbEventTypeColName] = idx
idx += 1

opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return opts, nil
if encodingOpts.UpdatedTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName)
valueCols[parquetOptUpdatedTimestampColName] = idx
idx += 1
}
if encodingOpts.MVCCTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName)
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return parquetOpts, nil
}

// serializeMap serializes a map to a string. For example, orderedKeys=["b",
Expand Down Expand Up @@ -213,12 +276,9 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error
// newParquetWriter allocates a new parquet writer using the provided
// schema definition.
func newParquetWriter(
sch *parquet.SchemaDefinition,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
sch *parquet.SchemaDefinition, sink io.Writer, opts ...parquet.Option,
) (*parquet.Writer, error) {
if knobs != nil && knobs.EnableParquetMetadata {
if includeParquestTestMetadata {
// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
return parquet.NewWriterWithReaderMeta(sch, sink, opts...)
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand All @@ -29,7 +30,7 @@ import (
// This is an extra column that will be added to every parquet file which tells
// us about the type of event that generated a particular row. The types are
// defined below.
const parquetCrdbEventTypeColName string = "__crdb__event_type__"
const parquetCrdbEventTypeColName string = metaSentinel + "event_type"

type parquetEventType int

Expand Down Expand Up @@ -147,7 +148,7 @@ func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp(

// TODO: Ideally, we do not create a new schema and writer every time
// we emit a resolved timestamp. Currently, util/parquet does not support it.
writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs)
writer, err := newParquetWriter(sch, &buf)
if err != nil {
return err
}
Expand Down Expand Up @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
s := parquetSink.wrapped
Expand All @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
if file.parquetCodec == nil {
var err error
file.parquetCodec, err = newParquetWriterFromRow(
updatedRow, &file.buf, parquetSink.wrapped.testingKnobs,
updatedRow, &file.buf, encodingOpts,
parquet.WithCompressionCodec(parquetSink.compression))
if err != nil {
return err
}
}

if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil {
if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand Down
Loading

0 comments on commit af78952

Please sign in to comment.