Skip to content

Commit

Permalink
changefeedccl: support the updated and mvcc timestamp options with pa…
Browse files Browse the repository at this point in the history
…rquet format

Previously, using the parquet format in changefeeds did not support
using the `mvcc` or `updated` options. This change adds support for using
these options with parquet.

Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Jun 6, 2023
1 parent e5b5ba7 commit 350fb1e
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 71 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
28 changes: 15 additions & 13 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp }

type kvEventToRowConsumer struct {
frontier
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingFormat changefeedbase.FormatType
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingOpts changefeedbase.EncodingOptions

topicDescriptorCache map[TopicIdentifier]TopicDescriptor
topicNamer *TopicNamer
Expand Down Expand Up @@ -256,7 +256,7 @@ func newKVEventToRowConsumer(
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
topicNamer: topicNamer,
evaluator: evaluator,
encodingFormat: encodingOpts.Format,
encodingOpts: encodingOpts,
metrics: metrics,
pacer: pacer,
}, nil
Expand Down Expand Up @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
}
}

if c.encodingFormat == changefeedbase.OptFormatParquet {
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
return c.encodeForParquet(
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc,
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
c.encodingOpts, alloc,
)
}
var keyCopy, valueCopy []byte
Expand Down Expand Up @@ -478,14 +479,15 @@ func (c *kvEventToRowConsumer) encodeForParquet(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
sinkWithEncoder, ok := c.sink.(SinkWithEncoder)
if !ok {
return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink)
}
if err := sinkWithEncoder.EncodeAndEmitRow(
ctx, updatedRow, prevRow, topic, updated, mvcc, alloc,
ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc,
); err != nil {
return err
}
Expand Down
101 changes: 81 additions & 20 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)
Expand All @@ -27,13 +29,17 @@ import (
var includeParquestTestMetadata = false

type parquetWriter struct {
inner *parquet.Writer
datumAlloc []tree.Datum
inner *parquet.Writer
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
func newParquetSchemaDefintion(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions,
) (*parquet.SchemaDefinition, error) {
var columnNames []string
var columnTypes []*types.T

Expand All @@ -44,46 +50,75 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int
numCols += 1
return nil
}); err != nil {
return nil, 0, err
return nil, err
}

columnNames = append(columnNames, parquetCrdbEventTypeColName)
columnTypes = append(columnTypes, types.String)
numCols += 1

columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts)

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
return nil, err
}
return schemaDef, nil
}

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
) (updatedNames []string, updatedTypes []*types.T) {
if encodingOpts.UpdatedTimestamps {
columnNames = append(columnNames, parquetOptUpdatedTimestampColName)
columnTypes = append(columnTypes, types.String)
}
if encodingOpts.MVCCTimestamps {
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
return schemaDef, numCols, nil
return columnNames, columnTypes
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, opts ...parquet.Option,
row cdcevent.Row,
sink io.Writer,
encodingOpts changefeedbase.EncodingOptions,
opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
schemaDef, err := newParquetSchemaDefintion(row, encodingOpts)
if err != nil {
return nil, err
}

if includeParquestTestMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil {
return nil, err
}
}
writer, err := newParquetWriter(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
return &parquetWriter{
inner: writer,
encodingOpts: encodingOpts,
schemaDef: schemaDef,
datumAlloc: make([]tree.Datum, schemaDef.NumColumns()),
}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
// that data will be flushed after this function returns.
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil {
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
return err
}

Expand All @@ -96,7 +131,13 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
) error {
datums := datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
Expand All @@ -106,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
return err
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
return nil
}

Expand All @@ -116,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value
// columns in a JSON object. The metadata generated by this function contains
// key and value column names along with their offsets in the parquet file.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
func addParquetTestMetadata(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option,
) ([]parquet.Option, error) {
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
Expand All @@ -129,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
keysInOrder = append(keysInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// NB: We do not use ForAllColumns here because it will always contain the
Expand All @@ -143,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
valuesInOrder = append(valuesInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// Iterate over ForAllColumns to determine the offets of each column
Expand All @@ -162,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
idx += 1
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}
valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName)
valueCols[parquetCrdbEventTypeColName] = idx
idx += 1

opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return opts, nil
if encodingOpts.UpdatedTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName)
valueCols[parquetOptUpdatedTimestampColName] = idx
idx += 1
}
if encodingOpts.MVCCTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName)
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return parquetOpts, nil
}

// serializeMap serializes a map to a string. For example, orderedKeys=["b",
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand All @@ -29,7 +30,7 @@ import (
// This is an extra column that will be added to every parquet file which tells
// us about the type of event that generated a particular row. The types are
// defined below.
const parquetCrdbEventTypeColName string = "__crdb__event_type__"
const parquetCrdbEventTypeColName string = metaSentinel + "event_type"

type parquetEventType int

Expand Down Expand Up @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
s := parquetSink.wrapped
Expand All @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
if file.parquetCodec == nil {
var err error
file.parquetCodec, err = newParquetWriterFromRow(
updatedRow, &file.buf,
updatedRow, &file.buf, encodingOpts,
parquet.WithCompressionCodec(parquetSink.compression))
if err != nil {
return err
}
}

if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil {
if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
Expand Down Expand Up @@ -120,21 +121,23 @@ func TestParquetRows(t *testing.T) {
ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false)
require.NoError(t, err)

encodingOpts := changefeedbase.EncodingOptions{}

if writer == nil {
writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize),
writer, err = newParquetWriterFromRow(updatedRow, f, encodingOpts, parquet.WithMaxRowGroupLength(maxRowGroupSize),
parquet.WithCompressionCodec(parquet.CompressionGZIP))
if err != nil {
t.Fatalf(err.Error())
}
numCols = len(updatedRow.ResultColumns()) + 1
}

err = writer.addData(updatedRow, prevRow)
err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{})
require.NoError(t, err)

// Save a copy of the datums we wrote.
datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1)
err = populateDatums(updatedRow, prevRow, datumRow)
datumRow := make([]tree.Datum, writer.schemaDef.NumColumns())
err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow)
require.NoError(t, err)
datums[i] = datumRow
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,11 @@ func (s errorWrapperSink) EncodeAndEmitRow(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok {
return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc)
return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc)
}
return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped)
}
Expand Down Expand Up @@ -716,6 +717,7 @@ type SinkWithEncoder interface {
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error

Expand Down
Loading

0 comments on commit 350fb1e

Please sign in to comment.