From fdec022170528d4f4669a267ce784e913c2d4951 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 6 Jun 2023 16:39:44 -0400 Subject: [PATCH 1/7] changefeedccl: support key_in_value with parquet format Previously, the option `key_in_value` was disallowed with `format=parquet`. This change allows these settings to be used together. Note that `key_in_value` is enabled by default with `cloudstorage` sinks and `format=parquet` is only allowed with cloudstorage sinks, so `key_in_value` is enabled for parquet by default. Informs: https://github.com/cockroachdb/cockroach/issues/103129 Informs: https://github.com/cockroachdb/cockroach/issues/99028 Epic: CRDB-27372 Release note: None --- .../changefeedccl/changefeedbase/options.go | 5 +- pkg/ccl/changefeedccl/testfeed_test.go | 57 ++++++++++--------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 97ee75b2348e..6a6af49aac42 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -439,11 +439,8 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // ParquetFormatUnsupportedOptions is options that are not supported with the // parquet format. // -// OptKeyInValue is disallowed because parquet files have no concept of key -// columns, so there is no reason to emit duplicate key datums. -// // TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index c5df6e32538d..1110feeb6c4c 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1050,41 +1050,46 @@ func (f *cloudFeedFactory) Feed( tree.KVOption{Key: changefeedbase.OptKeyInValue}, ) } - // Determine if we can enable the parquet format if the changefeed is not - // being created with incompatible options. If it can be enabled, we will use - // parquet format with a probability of 0.4. - parquetPossible := includeParquestTestMetadata + + formatSpecified := false explicitEnvelope := false for _, opt := range createStmt.Options { + if string(opt.Key) == changefeedbase.OptFormat { + formatSpecified = true + } if string(opt.Key) == changefeedbase.OptEnvelope { explicitEnvelope = true } - if string(opt.Key) == changefeedbase.OptFormat && - opt.Value.String() != string(changefeedbase.OptFormatParquet) { + } + + if !formatSpecified { + // Determine if we can enable the parquet format if the changefeed is not + // being created with incompatible options. If it can be enabled, we will use + // parquet format with a probability of 0.4. + parquetPossible := includeParquestTestMetadata + for _, opt := range createStmt.Options { + for o := range changefeedbase.ParquetFormatUnsupportedOptions { + if o == string(opt.Key) { + parquetPossible = false + break + } + } + } + randNum := rand.Intn(5) + if randNum < 3 { parquetPossible = false - break } - for o := range changefeedbase.ParquetFormatUnsupportedOptions { - if o == string(opt.Key) { - parquetPossible = false - break - } + if parquetPossible { + log.Infof(context.Background(), "using parquet format") + createStmt.Options = append( + createStmt.Options, + tree.KVOption{ + Key: changefeedbase.OptFormat, + Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)), + }, + ) } } - randNum := rand.Intn(5) - if randNum < 2 { - parquetPossible = false - } - if parquetPossible { - log.Infof(context.Background(), "using parquet format") - createStmt.Options = append( - createStmt.Options, - tree.KVOption{ - Key: changefeedbase.OptFormat, - Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)), - }, - ) - } feedDir := feedSubDir() sinkURI := `nodelocal://1/` + feedDir From 707f7d19770d212b2f2f73fc0795310e953ebe7b Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 8 Jun 2023 10:50:01 -0400 Subject: [PATCH 2/7] changefeedccl: add test coverage for parquet event types When using `format=parquet`, an additional column is produced to indicate the type of operation corresponding to the row: create, update, or delete. This change adds coverage for this in unit testing. Additionally, the test modified in this change is made more simple by reducing the number of rows and different types because this complexity is unnecessary as all types are tested within the util/parquet package already. Informs: #99028 Epic: CRDB-27372 Release note: None Epic: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/parquet_test.go | 51 +++++++++++++++------------ 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index c180f3344f40..e7dc09306fc6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -303,6 +303,7 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", + "//pkg/util/uuid", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index a55fd3ca8839..4f970523ce58 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -54,29 +55,39 @@ func TestParquetRows(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) for _, tc := range []struct { - testName string - createTable string - inserts []string + testName string + createTable string + stmts []string + expectedDatumRows [][]tree.Datum }{ { testName: "mixed", createTable: `CREATE TABLE foo ( int32Col INT4 PRIMARY KEY, - varCharCol VARCHAR(16) , - charCol CHAR(2), - tsCol TIMESTAMP , stringCol STRING , - decimalCOl DECIMAL(12,2), uuidCol UUID )`, - inserts: []string{ - `INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`, - `INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`, - `INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`, - `INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, - `INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`, - `INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, - `INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`, + stmts: []string{ + `INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`, + `INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`, + `INSERT INTO foo VALUES (2, 'c1', '5a02bd48-ba64-4134-9199-844c1517f722')`, + `UPDATE foo SET stringCol = 'changed' WHERE int32Col = 1`, + `DELETE FROM foo WHERE int32Col = 0`, + }, + expectedDatumRows: [][]tree.Datum{ + {tree.NewDInt(0), tree.NewDString("a1"), + &tree.DUuid{uuid.FromStringOrNil("2fec7a4b-0a78-40ce-92e0-d1c0fac70436")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(1), tree.NewDString("b1"), + &tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(2), tree.NewDString("c1"), + &tree.DUuid{uuid.FromStringOrNil("5a02bd48-ba64-4134-9199-844c1517f722")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(1), tree.NewDString("changed"), + &tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")}, + parquetEventTypeDatumStringMap[parquetEventUpdate]}, + {tree.NewDInt(0), tree.DNull, tree.DNull, parquetEventTypeDatumStringMap[parquetEventDelete]}, }, }, } { @@ -104,8 +115,8 @@ func TestParquetRows(t *testing.T) { } }() - numRows := len(tc.inserts) - for _, insertStmt := range tc.inserts { + numRows := len(tc.stmts) + for _, insertStmt := range tc.stmts { sqlDB.Exec(t, insertStmt) } @@ -135,11 +146,7 @@ func TestParquetRows(t *testing.T) { err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{}) require.NoError(t, err) - // Save a copy of the datums we wrote. - datumRow := make([]tree.Datum, writer.schemaDef.NumColumns()) - err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow) - require.NoError(t, err) - datums[i] = datumRow + datums[i] = tc.expectedDatumRows[i] } err = writer.close() From d49d01f0d925df0cf9d4252f9b923a57f8696218 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 7 Jun 2023 17:31:39 -0400 Subject: [PATCH 3/7] util/parquet: support tuple labels in util/parquet testutils Previously, the test utilities in `util/parquet` would not reconstruct tuples read from files with their labels. This change updates the package to do so. This is required for testing in users of this package such as CDC. Informs: #99028 Epic: CRDB-27372 Release note: None --- pkg/util/parquet/testutils.go | 50 ++++++++++++++++++++++++--------- pkg/util/parquet/writer_test.go | 10 ++++--- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pkg/util/parquet/testutils.go b/pkg/util/parquet/testutils.go index bbc7c38d4489..d6f88bfc10ad 100644 --- a/pkg/util/parquet/testutils.go +++ b/pkg/util/parquet/testutils.go @@ -119,6 +119,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu return ReadDatumsMetadata{}, nil, err } + var colNames []string startingRowIdx := 0 for rg := 0; rg < reader.NumRowGroups(); rg++ { rgr := reader.RowGroup(rg) @@ -126,6 +127,9 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu for i := int64(0); i < rowsInRowGroup; i++ { readDatums = append(readDatums, make([]tree.Datum, rgr.NumColumns())) } + if rg == 0 { + colNames = make([]string, 0, rgr.NumColumns()) + } for colIdx := 0; colIdx < rgr.NumColumns(); colIdx++ { col, err := rgr.Column(colIdx) @@ -133,6 +137,10 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu return ReadDatumsMetadata{}, nil, err } + if rg == 0 { + colNames = append(colNames, col.Descriptor().Name()) + } + dec, err := decoderFromFamilyAndType(oid.Oid(typOids[colIdx]), types.Family(typFamilies[colIdx])) if err != nil { return ReadDatumsMetadata{}, nil, err @@ -154,7 +162,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu } for i := 0; i < len(readDatums); i++ { - readDatums[i] = squashTuples(readDatums[i], tupleColumns) + readDatums[i] = squashTuples(readDatums[i], tupleColumns, colNames) } return makeDatumMeta(reader, readDatums), readDatums, nil @@ -411,11 +419,14 @@ func ValidateDatum(t *testing.T, expected tree.Datum, actual tree.Datum) { ValidateDatum(t, arr1[i], arr2[i]) } case types.TupleFamily: - arr1 := expected.(*tree.DTuple).D - arr2 := actual.(*tree.DTuple).D - require.Equal(t, len(arr1), len(arr2)) - for i := 0; i < len(arr1); i++ { - ValidateDatum(t, arr1[i], arr2[i]) + t1 := expected.(*tree.DTuple) + t2 := actual.(*tree.DTuple) + require.Equal(t, len(t1.D), len(t2.D)) + for i := 0; i < len(t1.D); i++ { + ValidateDatum(t, t1.D[i], t2.D[i]) + } + if t1.ResolvedType().TupleLabels() != nil { + require.Equal(t, t1.ResolvedType().TupleLabels(), t2.ResolvedType().TupleLabels()) } case types.EnumFamily: require.Equal(t, expected.(*tree.DEnum).LogicalRep, actual.(*tree.DEnum).LogicalRep) @@ -544,14 +555,25 @@ type dNullTupleType struct { } // squashTuples takes an array of datums and merges groups of adjacent datums -// into tuples using the passed intervals. Example: +// into tuples using the supplied intervals. The provided column names will be +// used as tuple labels. +// +// For example: +// +// Input: // -// Input: ["0", "1", "2", "3", "4", "5", "6"] [[0, 1], [3, 3], [4, 6]] -// Output: [("0", "1"), "2", ("3"), ("4", "5", "6")] +// datumRow = ["0", "1", "2", "3", "4", "5", "6"] +// tupleColIndexes = [[0, 1], [3, 3], [4, 6]] +// colNames = ["a", "b", "c", "d", "e", "f", "g"] +// +// Output: +// +// [(a: "0", b: "1"), "2", (d: "3"), (e: "4", f: "5", g: "6")] // // Behavior is undefined if the intervals are not sorted, not disjoint, -// not ascending, or out of bounds. -func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { +// not ascending, or out of bounds. The number of elements in datumRow +// should be equal to the number of labels in colNames. +func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int, colNames []string) []tree.Datum { if len(tupleColIndexes) == 0 { return datumRow } @@ -559,6 +581,7 @@ func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { var updatedDatums []tree.Datum var currentTupleDatums []tree.Datum var currentTupleTypes []*types.T + var currentTupleLabels []string for i, d := range datumRow { if tupleIdx < len(tupleColIndexes) { tupleUpperIdx := tupleColIndexes[tupleIdx][1] @@ -567,18 +590,19 @@ func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { if i >= tupleLowerIdx && i <= tupleUpperIdx { currentTupleDatums = append(currentTupleDatums, d) currentTupleTypes = append(currentTupleTypes, d.ResolvedType()) - + currentTupleLabels = append(currentTupleLabels, colNames[i]) if i == tupleUpperIdx { // Check for marker that indicates the entire tuple is NULL. if currentTupleDatums[0] == dNullTuple { updatedDatums = append(updatedDatums, tree.DNull) } else { - tupleDatum := tree.MakeDTuple(types.MakeTuple(currentTupleTypes), currentTupleDatums...) + tupleDatum := tree.MakeDTuple(types.MakeLabeledTuple(currentTupleTypes, currentTupleLabels), currentTupleDatums...) updatedDatums = append(updatedDatums, &tupleDatum) } currentTupleTypes = []*types.T{} currentTupleDatums = []tree.Datum{} + currentTupleLabels = []string{} tupleIdx += 1 } diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index 754bbe1398e4..abb90a5893c8 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -623,6 +623,7 @@ func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *fil err = reader.Close() require.NoError(t, err) } + func TestSquashTuples(t *testing.T) { datums := []tree.Datum{ tree.NewDInt(1), @@ -634,6 +635,7 @@ func TestSquashTuples(t *testing.T) { tree.NewDJSON(json.FromBool(false)), tree.NewDInt(0), } + labels := []string{"a", "b", "c", "d", "e", "f", "g", "h"} for _, tc := range []struct { tupleIntervals [][]int @@ -645,18 +647,18 @@ func TestSquashTuples(t *testing.T) { }, { tupleIntervals: [][]int{{0, 1}, {2, 4}}, - tupleOutput: "[(1, 'string') ('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') 0.1 'false' 0]", + tupleOutput: "[((1, 'string') AS a, b) (('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') AS c, d, e) 0.1 'false' 0]", }, { tupleIntervals: [][]int{{0, 2}, {3, 3}}, - tupleOutput: "[(1, 'string', '\\x6279746573') ('52fdfc07-2182-454f-963f-5f0f9a621d72',) '1' 0.1 'false' 0]", + tupleOutput: "[((1, 'string', '\\x6279746573') AS a, b, c) (('52fdfc07-2182-454f-963f-5f0f9a621d72',) AS d) '1' 0.1 'false' 0]", }, { tupleIntervals: [][]int{{0, 7}}, - tupleOutput: "[(1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0)]", + tupleOutput: "[((1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0) AS a, b, c, d, e, f, g, h)]", }, } { - squashedDatums := squashTuples(datums, tc.tupleIntervals) + squashedDatums := squashTuples(datums, tc.tupleIntervals, labels) require.Equal(t, tc.tupleOutput, fmt.Sprint(squashedDatums)) } } From 735234322816718598173913ca2f5393bdc232a9 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 7 Jun 2023 15:35:01 -0400 Subject: [PATCH 4/7] changefeedccl: support diff option with parquet format This change adds support for the `diff` changefeed options when using `format=parquet`. Enabling `diff` also adds support for CDC Transformations with parquet. Informs: https://github.com/cockroachdb/cockroach/issues/103129 Informs: https://github.com/cockroachdb/cockroach/issues/99028 Epic: CRDB-27372 Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 6 +- .../changefeedccl/changefeedbase/options.go | 4 +- .../changefeedbase/options_test.go | 2 +- pkg/ccl/changefeedccl/parquet.go | 67 ++++++++++++++++--- pkg/ccl/changefeedccl/testfeed_test.go | 9 +++ 5 files changed, 69 insertions(+), 19 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index ef87dadc34c7..7fb6b927ee84 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -269,11 +269,7 @@ func TestChangefeedBasicQuery(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) - // Currently, parquet format (which may be injected by feed() call, doesn't - // know how to handle tuple types (cdc_prev); so, force JSON format. - foo := feed(t, f, ` -CREATE CHANGEFEED WITH format='json' -AS SELECT *, event_op() AS op, cdc_prev FROM foo`) + foo := feed(t, f, `CREATE CHANGEFEED AS SELECT *, event_op() AS op, cdc_prev FROM foo`) defer closeFeed(t, foo) // 'initial' is skipped because only the latest value ('updated') is diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 6a6af49aac42..291d91a323af 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -438,9 +438,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // ParquetFormatUnsupportedOptions is options that are not supported with the // parquet format. -// -// TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptTopicInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/changefeedbase/options_test.go b/pkg/ccl/changefeedccl/changefeedbase/options_test.go index 4eb5c62a0dbc..a5e8eb342e09 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options_test.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options_test.go @@ -33,9 +33,9 @@ func TestOptionsValidations(t *testing.T) { }{ {map[string]string{"format": "txt"}, false, "unknown format"}, {map[string]string{"initial_scan": "", "no_initial_scan": ""}, false, "cannot specify both"}, - {map[string]string{"diff": "", "format": "parquet"}, false, "cannot specify both"}, {map[string]string{"format": "txt"}, true, "unknown format"}, {map[string]string{"initial_scan": "", "no_initial_scan": ""}, true, "cannot specify both"}, + {map[string]string{"format": "parquet", "topic_in_value": ""}, false, "cannot specify both"}, // Verify that the returned error uses the syntax initial_scan='yes' instead of initial_scan_only. See #97008. {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 9e50c8360adb..95d5e16a51be 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -13,13 +13,16 @@ import ( "io" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) @@ -33,6 +36,9 @@ type parquetWriter struct { encodingOpts changefeedbase.EncodingOptions schemaDef *parquet.SchemaDefinition datumAlloc []tree.Datum + + // Cached object builder for when using the `diff` option. + vb *json.FixedKeysObjectBuilder } // newParquetSchemaDefintion returns a parquet schema definition based on the @@ -68,6 +74,7 @@ func newParquetSchemaDefintion( const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps +const parquetOptDiffColName = metaSentinel + "before" func appendMetadataColsToSchema( columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions, @@ -80,6 +87,10 @@ func appendMetadataColsToSchema( columnNames = append(columnNames, parquetOptMVCCTimestampColName) columnTypes = append(columnTypes, types.String) } + if encodingOpts.Diff { + columnNames = append(columnNames, parquetOptDiffColName) + columnTypes = append(columnTypes, types.Json) + } return columnNames, columnTypes } @@ -118,7 +129,7 @@ func newParquetWriterFromRow( func (w *parquetWriter) addData( updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, ) error { - if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil { + if err := w.populateDatums(updatedRow, prevRow, updated, mvcc); err != nil { return err } @@ -131,14 +142,10 @@ func (w *parquetWriter) close() error { } // populateDatums writes the appropriate datums into the datumAlloc slice. -func populateDatums( - updatedRow cdcevent.Row, - prevRow cdcevent.Row, - encodingOpts changefeedbase.EncodingOptions, - updated, mvcc hlc.Timestamp, - datumAlloc []tree.Datum, +func (w *parquetWriter) populateDatums( + updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, ) error { - datums := datumAlloc[:0] + datums := w.datumAlloc[:0] if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { datums = append(datums, d) @@ -148,12 +155,47 @@ func populateDatums( } datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) - if encodingOpts.UpdatedTimestamps { + if w.encodingOpts.UpdatedTimestamps { datums = append(datums, tree.NewDString(timestampToString(updated))) } - if encodingOpts.MVCCTimestamps { + if w.encodingOpts.MVCCTimestamps { datums = append(datums, tree.NewDString(timestampToString(mvcc))) } + if w.encodingOpts.Diff { + if prevRow.IsDeleted() { + datums = append(datums, tree.DNull) + } else { + if w.vb == nil { + keys := make([]string, 0, len(prevRow.ResultColumns())) + _ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { + keys = append(keys, col.Name) + return nil + }) + valueBuilder, err := json.NewFixedKeysObjectBuilder(keys) + if err != nil { + return err + } + w.vb = valueBuilder + } + + if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + return w.vb.Set(col.Name, j) + }); err != nil { + return err + } + + j, err := w.vb.Build() + if err != nil { + return err + } + datums = append(datums, tree.NewDJSON(j)) + } + } + return nil } @@ -228,6 +270,11 @@ func addParquetTestMetadata( valueCols[parquetOptMVCCTimestampColName] = idx idx += 1 } + if encodingOpts.Diff { + valuesInOrder = append(valuesInOrder, parquetOptDiffColName) + valueCols[parquetOptDiffColName] = idx + idx += 1 + } parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 1110feeb6c4c..16e0eeeca5bd 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1262,6 +1262,8 @@ func (c *cloudFeed) appendParquetTestFeedMessages( metaColumnNameSet[colName] = colIdx case parquetOptMVCCTimestampColName: metaColumnNameSet[colName] = colIdx + case parquetOptDiffColName: + metaColumnNameSet[colName] = colIdx default: } } @@ -1331,6 +1333,13 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j) } + if mvccColIdx, mvcc := metaColumnNameSet[parquetOptDiffColName]; mvcc { + j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + valueWithAfter.Add("before", j) + } } keyJSON := keyJSONBuilder.Build() From f763d8e4b3dbc0019b7f8f61dd5869be72650025 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 7 Jun 2023 15:36:15 -0400 Subject: [PATCH 5/7] changefeedccl: support end_time option with parquet format This change adds support for the `end_time` changefeed options when using `format=parquet`. No significant code changes were needed to enable this feature. Closes: https://github.com/cockroachdb/cockroach/issues/103129 Closes: https://github.com/cockroachdb/cockroach/issues/99028 Epic: CRDB-27372 Release note (enterprise change): Changefeeds now officially support the parquet format at specificiation version 2.6. It is only usable with the cloudstorage sink. The syntax to use parquet is like the following: `CREATE CHANGEFEED FOR foo INTO `...` WITH format=parquet` It supports all standard changefeed options and features including CDC transformations, except it does not support the `topic_in_value` option. --- pkg/ccl/changefeedccl/changefeedbase/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 291d91a323af..1e2a8a4d8b40 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -438,7 +438,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // ParquetFormatUnsupportedOptions is options that are not supported with the // parquet format. -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptTopicInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. From 32d77068fdf97b27a571224b7f6cf4ad5316b340 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 8 Jun 2023 11:28:22 -0400 Subject: [PATCH 6/7] changefeedccl: use parquet with 50% probability in nemeses test Informs: #99028 Epic: CRDB-27372 Release note: None --- pkg/ccl/changefeedccl/cdctest/nemeses.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index bb1612fcd2fb..6bb4597c75a0 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -154,7 +154,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er } } - foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff`) + withFormatParquet := "" + if rand.Intn(2) < 1 { + withFormatParquet = ", format=parquet" + } + foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet)) if err != nil { return nil, err } From d5d1c88aa851398582f5fdf9e30653f8d4123883 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 6 Jun 2023 16:40:52 -0400 Subject: [PATCH 7/7] do not merge: force parquet cloud storage tests This change forces all tests, including tests for `diff` and `end_time` to run with the `cloudstorage` sink and `format=parquet` where possible. Informs: https://github.com/cockroachdb/cockroach/issues/103129 Informs: https://github.com/cockroachdb/cockroach/issues/99028 Epic: CRDB-27372 Release note: None --- pkg/ccl/changefeedccl/cdctest/nemeses.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 12 ++++++------ pkg/ccl/changefeedccl/helpers_test.go | 3 +++ pkg/ccl/changefeedccl/testfeed_test.go | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 6bb4597c75a0..93b0e498693b 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -155,7 +155,7 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er } withFormatParquet := "" - if rand.Intn(2) < 1 { + if rand.Intn(2) < 2 { withFormatParquet = ", format=parquet" } foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet)) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7fb6b927ee84..12168d21209d 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -314,7 +314,7 @@ func TestChangefeedBasicQueryWrapped(t *testing.T) { // Currently, parquet format (which may be injected by feed() call), doesn't // know how to handle tuple types (cdc_prev); so, force JSON format. foo := feed(t, f, ` -CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff +CREATE CHANGEFEED WITH envelope='wrapped', format='parquet', diff AS SELECT b||a AS ba, event_op() AS op FROM foo`) defer closeFeed(t, foo) @@ -343,7 +343,7 @@ AS SELECT b||a AS ba, event_op() AS op FROM foo`) }) } - cdcTest(t, testFn, feedTestForceSink("webhook")) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } // Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO. @@ -577,7 +577,7 @@ func TestChangefeedDiff(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff, format=parquet`) defer closeFeed(t, foo) // 'initial' is skipped because only the latest value ('updated') is @@ -609,7 +609,7 @@ func TestChangefeedDiff(t *testing.T) { }) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } func TestChangefeedTenants(t *testing.T) { @@ -7208,7 +7208,7 @@ func TestChangefeedEndTime(t *testing.T) { sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)") fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime) + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1, format=parquet", fakeEndTime) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ @@ -7225,7 +7225,7 @@ func TestChangefeedEndTime(t *testing.T) { })) } - cdcTest(t, testFn, feedTestEnterpriseSinks) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } func TestChangefeedEndTimeWithCursor(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 194f4011e234..857dfa3fd1c7 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -858,6 +858,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { sinkWeights[sinkType] = 0 } } + if weight, ok := sinkWeights["cloudstorage"]; ok && weight != 0 { + sinkWeights = map[string]int{"cloudstorage": 1} + } weightTotal := 0 for _, weight := range sinkWeights { weightTotal += weight diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 16e0eeeca5bd..42b8d8ce42f9 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1076,7 +1076,7 @@ func (f *cloudFeedFactory) Feed( } } randNum := rand.Intn(5) - if randNum < 3 { + if randNum < 0 { parquetPossible = false } if parquetPossible {