Skip to content

Commit

Permalink
Merge #104528
Browse files Browse the repository at this point in the history
104528: changefeedccl: add full support for the parquet format r=miretskiy a=jayshrivastava

### changefeedccl: support key_in_value with parquet format

Previously, the option `key_in_value` was disallowed with
`format=parquet`. This change allows these settings to be
used together. Note that `key_in_value` is enabled by
default with `cloudstorage` sinks and `format=parquet` is
only allowed with cloudstorage sinks, so `key_in_value` is
enabled for parquet by default.

Informs: #103129
Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None

---

### changefeedccl: add test coverage for parquet event types

When using `format=parquet`, an additional column is produced to
indicate the type of operation corresponding to the row: create,
update, or delete. This change adds coverage for this in unit
testing.

Additionally, the test modified in this change is made more simple
by reducing the number of rows and different types because this
complexity is unnecessary as all types are tested within the
util/parquet package already.

Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None
Epic: None

---

### util/parquet: support tuple labels in util/parquet testutils

Previously, the test utilities in `util/parquet` would not reconstruct
tuples read from files with their labels. This change updates the
package to do so. This is required for testing in users of this
package such as CDC.

Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None

---

### changefeedccl: support diff option with parquet format

This change adds support for the `diff` changefeed
options when using `format=parquet`. Enabling `diff` also adds
support for CDC Transformations with parquet.

Informs: #103129
Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None

---

### changefeedccl: support end_time option with parquet format

This change adds support for the `end_time` changefeed
options when using `format=parquet`. No significant code
changes were needed to enable this feature.

Closes: #103129
Closes: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note (enterprise change): Changefeeds now officially
support the parquet format at specificiation version 2.6.
It is only usable with the cloudstorage sink.

The syntax to use parquet is like the following:
`CREATE CHANGEFEED FOR foo INTO `...` WITH format=parquet`

It supports all standard changefeed options and features
including CDC transformations, except it does not support the
`topic_in_value` option.

---

### changefeedccl: use parquet with 50% probability in nemeses test

Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None

---

### do not merge: force parquet cloud storage tests

This change forces all tests, including tests for `diff` and `end_time`
to run with the `cloudstorage` sink and `format=parquet` where possible.

Informs: #103129
Informs: #99028
Epic: [CRDB-27372](https://cockroachlabs.atlassian.net/browse/CRDB-27372)
Release note: None

Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
craig[bot] and jayshrivastava committed Jun 15, 2023
2 parents 5dc031e + d5d1c88 commit 8bfa8f5
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 94 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/ledger",
"//pkg/workload/workloadsql",
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
}
}

foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff`)
withFormatParquet := ""
if rand.Intn(2) < 2 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
if err != nil {
return nil, err
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,7 @@ func TestChangefeedBasicQuery(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
// Currently, parquet format (which may be injected by feed() call, doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH format='json'
AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
foo := feed(t, f, `CREATE CHANGEFEED AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down Expand Up @@ -319,7 +315,7 @@ func TestChangefeedBasicQueryWrapped(t *testing.T) {
// Currently, parquet format (which may be injected by feed() call), doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff
CREATE CHANGEFEED WITH envelope='wrapped', format='parquet', diff
AS SELECT b||a AS ba, event_op() AS op FROM foo`)
defer closeFeed(t, foo)

Expand Down Expand Up @@ -348,7 +344,7 @@ AS SELECT b||a AS ba, event_op() AS op FROM foo`)
})
}

cdcTest(t, testFn, feedTestForceSink("webhook"))
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

// Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO.
Expand Down Expand Up @@ -582,7 +578,7 @@ func TestChangefeedDiff(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff, format=parquet`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
Expand Down Expand Up @@ -614,7 +610,7 @@ func TestChangefeedDiff(t *testing.T) {
})
}

cdcTest(t, testFn)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedTenants(t *testing.T) {
Expand Down Expand Up @@ -7215,7 +7211,7 @@ func TestChangefeedEndTime(t *testing.T) {
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")

fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime)
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1, format=parquet", fakeEndTime)
defer closeFeed(t, feed)

assertPayloads(t, feed, []string{
Expand All @@ -7232,7 +7228,7 @@ func TestChangefeedEndTime(t *testing.T) {
}))
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestChangefeedEndTimeWithCursor(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt

// ParquetFormatUnsupportedOptions is options that are not supported with the
// parquet format.
//
// OptKeyInValue is disallowed because parquet files have no concept of key
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestOptionsValidations(t *testing.T) {
}{
{map[string]string{"format": "txt"}, false, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, false, "cannot specify both"},
{map[string]string{"diff": "", "format": "parquet"}, false, "cannot specify both"},
{map[string]string{"format": "txt"}, true, "unknown format"},
{map[string]string{"initial_scan": "", "no_initial_scan": ""}, true, "cannot specify both"},
{map[string]string{"format": "parquet", "topic_in_value": ""}, false, "cannot specify both"},
// Verify that the returned error uses the syntax initial_scan='yes' instead of initial_scan_only. See #97008.
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
{map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights[sinkType] = 0
}
}
if weight, ok := sinkWeights["cloudstorage"]; ok && weight != 0 {
sinkWeights = map[string]int{"cloudstorage": 1}
}
weightTotal := 0
for _, weight := range sinkWeights {
weightTotal += weight
Expand Down
67 changes: 57 additions & 10 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"io"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)
Expand All @@ -33,6 +36,9 @@ type parquetWriter struct {
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum

// Cached object builder for when using the `diff` option.
vb *json.FixedKeysObjectBuilder
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
Expand Down Expand Up @@ -68,6 +74,7 @@ func newParquetSchemaDefintion(

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps
const parquetOptDiffColName = metaSentinel + "before"

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
Expand All @@ -80,6 +87,10 @@ func appendMetadataColsToSchema(
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
if encodingOpts.Diff {
columnNames = append(columnNames, parquetOptDiffColName)
columnTypes = append(columnTypes, types.Json)
}
return columnNames, columnTypes
}

Expand Down Expand Up @@ -118,7 +129,7 @@ func newParquetWriterFromRow(
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
if err := w.populateDatums(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand All @@ -131,14 +142,10 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
func (w *parquetWriter) populateDatums(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
datums := datumAlloc[:0]
datums := w.datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
Expand All @@ -148,12 +155,47 @@ func populateDatums(
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
if w.encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
if w.encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
if w.encodingOpts.Diff {
if prevRow.IsDeleted() {
datums = append(datums, tree.DNull)
} else {
if w.vb == nil {
keys := make([]string, 0, len(prevRow.ResultColumns()))
_ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
keys = append(keys, col.Name)
return nil
})
valueBuilder, err := json.NewFixedKeysObjectBuilder(keys)
if err != nil {
return err
}
w.vb = valueBuilder
}

if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
return w.vb.Set(col.Name, j)
}); err != nil {
return err
}

j, err := w.vb.Build()
if err != nil {
return err
}
datums = append(datums, tree.NewDJSON(j))
}
}

return nil
}

Expand Down Expand Up @@ -228,6 +270,11 @@ func addParquetTestMetadata(
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}
if encodingOpts.Diff {
valuesInOrder = append(valuesInOrder, parquetOptDiffColName)
valueCols[parquetOptDiffColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
Expand Down
51 changes: 29 additions & 22 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -54,29 +55,39 @@ func TestParquetRows(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)

for _, tc := range []struct {
testName string
createTable string
inserts []string
testName string
createTable string
stmts []string
expectedDatumRows [][]tree.Datum
}{
{
testName: "mixed",
createTable: `CREATE TABLE foo (
int32Col INT4 PRIMARY KEY,
varCharCol VARCHAR(16) ,
charCol CHAR(2),
tsCol TIMESTAMP ,
stringCol STRING ,
decimalCOl DECIMAL(12,2),
uuidCol UUID
)`,
inserts: []string{
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`,
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`,
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`,
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`,
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`,
stmts: []string{
`INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`,
`INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`,
`INSERT INTO foo VALUES (2, 'c1', '5a02bd48-ba64-4134-9199-844c1517f722')`,
`UPDATE foo SET stringCol = 'changed' WHERE int32Col = 1`,
`DELETE FROM foo WHERE int32Col = 0`,
},
expectedDatumRows: [][]tree.Datum{
{tree.NewDInt(0), tree.NewDString("a1"),
&tree.DUuid{uuid.FromStringOrNil("2fec7a4b-0a78-40ce-92e0-d1c0fac70436")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("b1"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(2), tree.NewDString("c1"),
&tree.DUuid{uuid.FromStringOrNil("5a02bd48-ba64-4134-9199-844c1517f722")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("changed"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventUpdate]},
{tree.NewDInt(0), tree.DNull, tree.DNull, parquetEventTypeDatumStringMap[parquetEventDelete]},
},
},
} {
Expand Down Expand Up @@ -104,8 +115,8 @@ func TestParquetRows(t *testing.T) {
}
}()

numRows := len(tc.inserts)
for _, insertStmt := range tc.inserts {
numRows := len(tc.stmts)
for _, insertStmt := range tc.stmts {
sqlDB.Exec(t, insertStmt)
}

Expand Down Expand Up @@ -135,11 +146,7 @@ func TestParquetRows(t *testing.T) {
err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{})
require.NoError(t, err)

// Save a copy of the datums we wrote.
datumRow := make([]tree.Datum, writer.schemaDef.NumColumns())
err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow)
require.NoError(t, err)
datums[i] = datumRow
datums[i] = tc.expectedDatumRows[i]
}

err = writer.close()
Expand Down
Loading

0 comments on commit 8bfa8f5

Please sign in to comment.