From 29efad766f405200d1add5b94b585130a179573e Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 25 May 2023 17:14:24 -0400 Subject: [PATCH 1/2] Revert "do not merge: always force parquet format in cdc tests if possible" This reverts commit 63eccd4a513dfea440a2f7d6176d759e4c8a9eba. Release note: None Epic: None --- pkg/ccl/changefeedccl/testfeed_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 09a3ed34a97f..54bc3de80737 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1073,7 +1073,7 @@ func (f *cloudFeedFactory) Feed( } } randNum := rand.Intn(5) - if randNum < 5 { + if randNum < 2 { parquetPossible = false } if parquetPossible { From 875cbfa69ac7d917c8ce4065ea21423472c9b989 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 31 May 2023 11:16:45 -0400 Subject: [PATCH 2/2] changefeedccl: refactor parquet column iterators Previously, some tests would fail due to invalid conversion from parquet to JSON in testing. These failures showed two underlying issues with how columns were written: - in tests watching a column family not containing the primary key of the table, we would not write the primary key to the parquet file - in tests where the primary key was defined with a particular order which is different than the table, the parquet testing code would not order the keys correctly This change fixes the two above issues by (a) using the correct iterators on `cdcevent.Row` and writing more verbose metadata to the parquet file to use in testing. Epic: None Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 1 + pkg/ccl/changefeedccl/parquet.go | 121 +++++++++++++++++++---- pkg/ccl/changefeedccl/testfeed_test.go | 73 ++++++-------- 3 files changed, 133 insertions(+), 62 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index b83bd2ed4fff..e18a622a2fb7 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6088,6 +6088,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { DistSQL: &execinfra.TestingKnobs{ DrainFast: true, Changefeed: &TestingKnobs{ + EnableParquetMetadata: true, // Filter out draining nodes; normally we rely on dist sql planner // to do that for us. FilterDrainingNodes: func( diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 3fc98aa8d5c7..632f78227e3a 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -9,13 +9,16 @@ package changefeedccl import ( + "bytes" "io" + "strconv" "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/parquet" + "github.com/cockroachdb/errors" ) type parquetWriter struct { @@ -26,23 +29,22 @@ type parquetWriter struct { // newParquetSchemaDefintion returns a parquet schema definition based on the // cdcevent.Row and the number of cols in the schema. func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) { - numCols := len(row.ResultColumns()) + 1 - - columnNames := make([]string, numCols) - columnTypes := make([]*types.T, numCols) - - idx := 0 - if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { - columnNames[idx] = col.Name - columnTypes[idx] = col.Typ - idx += 1 + var columnNames []string + var columnTypes []*types.T + + numCols := 0 + if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error { + columnNames = append(columnNames, col.Name) + columnTypes = append(columnTypes, col.Typ) + numCols += 1 return nil }); err != nil { return nil, 0, err } - columnNames[idx] = parquetCrdbEventTypeColName - columnTypes[idx] = types.String + columnNames = append(columnNames, parquetCrdbEventTypeColName) + columnTypes = append(columnTypes, types.String) + numCols += 1 schemaDef, err := parquet.NewSchema(columnNames, columnTypes) if err != nil { @@ -102,7 +104,7 @@ func (w *parquetWriter) close() error { func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { datums := datumAlloc[:0] - if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { + if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { datums = append(datums, d) return nil }); err != nil { @@ -114,24 +116,103 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] // addParquetTestMetadata appends options to the provided options to configure the // parquet writer to write metadata required by cdc test feed factories. +// +// Generally, cdc tests will convert the row to JSON loosely in the form: +// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value +// columns in a JSON object. The metadata generated by this function contains +// key and value column names along with their offsets in the parquet file. func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) { - keyCols := make([]string, 0) + // NB: Order matters. When iterating using ForAllColumns, which is used when + // writing datums and defining the schema, the order of columns usually + // matches the underlying table. If a composite keys defined, the order in + // ForEachKeyColumn may not match. In tests, we want to use the latter + // order when printing the keys. + keyCols := map[string]int{} + var keysInOrder []string if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error { - keyCols = append(keyCols, col.Name) + keyCols[col.Name] = -1 + keysInOrder = append(keysInOrder, col.Name) return nil }); err != nil { return opts, err } - opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")})) - allCols := make([]string, 0) + // NB: We do not use ForAllColumns here because it will always contain the + // key. In tests where we target a column family without a key in it, + // ForEachColumn will exclude the primary key of the table, which is what + // we want. + valueCols := map[string]int{} + var valuesInOrder []string if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { - allCols = append(allCols, col.Name) + valueCols[col.Name] = -1 + valuesInOrder = append(valuesInOrder, col.Name) + return nil + }); err != nil { + return opts, err + } + + // Iterate over ForAllColumns to determine the offets of each column + // in a parquet row (ie. the slice of datums provided to addData). We don't + // do this above because there is no way to determine it from + // cdcevent.ResultColumn. The Ordinal() method may return an invalid + // number for virtual columns. + idx := 0 + if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error { + if _, colIsInKey := keyCols[col.Name]; colIsInKey { + keyCols[col.Name] = idx + } + if _, colIsInValue := valueCols[col.Name]; colIsInValue { + valueCols[col.Name] = idx + } + idx += 1 return nil }); err != nil { return opts, err } - allCols = append(allCols, parquetCrdbEventTypeColName) - opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")})) + valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName) + valueCols[parquetCrdbEventTypeColName] = idx + idx += 1 + + opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) + opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) return opts, nil } + +// serializeMap serializes a map to a string. For example, orderedKeys=["b", +// "a"] m={"a": 1", "b": 2, "c":3} will return the string "b,2,a,1". +func serializeMap(orderedKeys []string, m map[string]int) string { + var buf bytes.Buffer + for i, k := range orderedKeys { + if i > 0 { + buf.WriteString(",") + } + buf.WriteString(k) + buf.WriteString(",") + buf.WriteString(strconv.Itoa(m[k])) + } + return buf.String() +} + +// deserializeMap deserializes a string in the form "b,2,a,1" and returns a map +// representation along with the keys in order: m={"a": 1", "b": 2} +// orderedKeys=["b", "a"]. +func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error) { + keyValues := strings.Split(s, ",") + if len(keyValues)%2 != 0 { + return nil, nil, + errors.AssertionFailedf("list of elements %s should have an even length", s) + } + for i := 0; i < len(keyValues); i += 2 { + key := keyValues[i] + value, err := strconv.Atoi(keyValues[i+1]) + if err != nil { + return nil, nil, err + } + orderedKeys = append(orderedKeys, key) + if i == 0 { + m = map[string]int{} + } + m[key] = value + } + return orderedKeys, m, nil +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 54bc3de80737..b53060e79ced 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1053,13 +1053,11 @@ func (f *cloudFeedFactory) Feed( // being created with incompatible options. If it can be enabled, we will use // parquet format with a probability of 0.4. parquetPossible := true - explicitEnvelope := false for _, opt := range createStmt.Options { if string(opt.Key) == changefeedbase.OptEnvelope { explicitEnvelope = true } - if string(opt.Key) == changefeedbase.OptFormat && opt.Value.String() != string(changefeedbase.OptFormatParquet) { parquetPossible = false @@ -1238,59 +1236,50 @@ func (c *cloudFeed) appendParquetTestFeedMessages( return errors.Errorf("could not find column names in parquet metadata") } - primaryKeys := strings.Split(primaryKeyColumnsString, ",") - columns := strings.Split(columnsNamesString, ",") - - columnNameSet := make(map[string]struct{}) - primaryKeyColumnSet := make(map[string]struct{}) - - for _, key := range primaryKeys { - primaryKeyColumnSet[key] = struct{}{} + primaryKeysNamesOrdered, primaryKeyColumnSet, err := deserializeMap(primaryKeyColumnsString) + if err != nil { + return err } - - // Drop parquetCrdbEventTypeColName. - for _, key := range columns[:len(columns)-1] { - columnNameSet[key] = struct{}{} + valueColumnNamesOrdered, columnNameSet, err := deserializeMap(columnsNamesString) + if err != nil { + return err } for _, row := range datums { - isDeleted := false - - // Remove parquetCrdbEventTypeColName from the column names list. Values - // for this column will still be present in datum rows. - rowCopy := make([]string, len(columns)-1) - copy(rowCopy, columns[:len(columns)-1]) + rowCopy := make([]string, len(valueColumnNamesOrdered)-1) + copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1]) rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy) if err != nil { return err } - keyJSONBuilder := json.NewArrayBuilder(len(primaryKeys)) - for colIdx, v := range row { - k := columns[colIdx] - if k == parquetCrdbEventTypeColName { - if *v.(*tree.DString) == *parquetEventDelete.DString() { - isDeleted = true - } - continue - } + keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered)) - if _, ok := columnNameSet[k]; ok { - j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC) - if err != nil { - return err - } - if err := rowJSONBuilder.Set(k, j); err != nil { - return err - } + isDeleted := false + + for _, primaryKeyColumnName := range primaryKeysNamesOrdered { + datum := row[primaryKeyColumnSet[primaryKeyColumnName]] + j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err } + keyJSONBuilder.Add(j) - if _, ok := primaryKeyColumnSet[k]; ok { - j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC) - if err != nil { - return err + } + for _, valueColumnName := range valueColumnNamesOrdered { + if valueColumnName == parquetCrdbEventTypeColName { + if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() { + isDeleted = true } - keyJSONBuilder.Add(j) + break + } + datum := row[columnNameSet[valueColumnName]] + j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + if err := rowJSONBuilder.Set(valueColumnName, j); err != nil { + return err } }