Skip to content

Commit

Permalink
changefeedccl: refactor parquet column iterators
Browse files Browse the repository at this point in the history
Previously, some tests would fail due to invalid conversion
from parquet to JSON in testing. These failures showed two
underlying issues with how columns were written:
- in tests watching a column family not containing the primary
  key of the table, we would not write the primary key to
  the parquet file
- in tests where the primary key was defined with a particular
  order which is different than the table, the parquet testing
  code would not order the keys correctly

This change fixes the two above issues by (a) using the correct
iterators on `cdcevent.Row` and writing more verbose
metadata to the parquet file to use in testing.

Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed May 31, 2023
1 parent 29efad7 commit 875cbfa
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 62 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6088,6 +6088,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
// Filter out draining nodes; normally we rely on dist sql planner
// to do that for us.
FilterDrainingNodes: func(
Expand Down
121 changes: 101 additions & 20 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package changefeedccl

import (
"bytes"
"io"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

type parquetWriter struct {
Expand All @@ -26,23 +29,22 @@ type parquetWriter struct {
// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
numCols := len(row.ResultColumns()) + 1

columnNames := make([]string, numCols)
columnTypes := make([]*types.T, numCols)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
columnNames[idx] = col.Name
columnTypes[idx] = col.Typ
idx += 1
var columnNames []string
var columnTypes []*types.T

numCols := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
columnNames = append(columnNames, col.Name)
columnTypes = append(columnTypes, col.Typ)
numCols += 1
return nil
}); err != nil {
return nil, 0, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String
columnNames = append(columnNames, parquetCrdbEventTypeColName)
columnTypes = append(columnTypes, types.String)
numCols += 1

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
Expand Down Expand Up @@ -102,7 +104,7 @@ func (w *parquetWriter) close() error {
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
datums := datumAlloc[:0]

if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
return nil
}); err != nil {
Expand All @@ -114,24 +116,103 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []

// addParquetTestMetadata appends options to the provided options to configure the
// parquet writer to write metadata required by cdc test feed factories.
//
// Generally, cdc tests will convert the row to JSON loosely in the form:
// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value
// columns in a JSON object. The metadata generated by this function contains
// key and value column names along with their offsets in the parquet file.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
keyCols := make([]string, 0)
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
// ForEachKeyColumn may not match. In tests, we want to use the latter
// order when printing the keys.
keyCols := map[string]int{}
var keysInOrder []string
if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error {
keyCols = append(keyCols, col.Name)
keyCols[col.Name] = -1
keysInOrder = append(keysInOrder, col.Name)
return nil
}); err != nil {
return opts, err
}
opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")}))

allCols := make([]string, 0)
// NB: We do not use ForAllColumns here because it will always contain the
// key. In tests where we target a column family without a key in it,
// ForEachColumn will exclude the primary key of the table, which is what
// we want.
valueCols := map[string]int{}
var valuesInOrder []string
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
allCols = append(allCols, col.Name)
valueCols[col.Name] = -1
valuesInOrder = append(valuesInOrder, col.Name)
return nil
}); err != nil {
return opts, err
}

// Iterate over ForAllColumns to determine the offets of each column
// in a parquet row (ie. the slice of datums provided to addData). We don't
// do this above because there is no way to determine it from
// cdcevent.ResultColumn. The Ordinal() method may return an invalid
// number for virtual columns.
idx := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, colIsInKey := keyCols[col.Name]; colIsInKey {
keyCols[col.Name] = idx
}
if _, colIsInValue := valueCols[col.Name]; colIsInValue {
valueCols[col.Name] = idx
}
idx += 1
return nil
}); err != nil {
return opts, err
}
allCols = append(allCols, parquetCrdbEventTypeColName)
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")}))
valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName)
valueCols[parquetCrdbEventTypeColName] = idx
idx += 1

opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return opts, nil
}

// serializeMap serializes a map to a string. For example, orderedKeys=["b",
// "a"] m={"a": 1", "b": 2, "c":3} will return the string "b,2,a,1".
func serializeMap(orderedKeys []string, m map[string]int) string {
var buf bytes.Buffer
for i, k := range orderedKeys {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString(",")
buf.WriteString(strconv.Itoa(m[k]))
}
return buf.String()
}

// deserializeMap deserializes a string in the form "b,2,a,1" and returns a map
// representation along with the keys in order: m={"a": 1", "b": 2}
// orderedKeys=["b", "a"].
func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error) {
keyValues := strings.Split(s, ",")
if len(keyValues)%2 != 0 {
return nil, nil,
errors.AssertionFailedf("list of elements %s should have an even length", s)
}
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value, err := strconv.Atoi(keyValues[i+1])
if err != nil {
return nil, nil, err
}
orderedKeys = append(orderedKeys, key)
if i == 0 {
m = map[string]int{}
}
m[key] = value
}
return orderedKeys, m, nil
}
73 changes: 31 additions & 42 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,13 +1053,11 @@ func (f *cloudFeedFactory) Feed(
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := true

explicitEnvelope := false
for _, opt := range createStmt.Options {
if string(opt.Key) == changefeedbase.OptEnvelope {
explicitEnvelope = true
}

if string(opt.Key) == changefeedbase.OptFormat &&
opt.Value.String() != string(changefeedbase.OptFormatParquet) {
parquetPossible = false
Expand Down Expand Up @@ -1238,59 +1236,50 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
return errors.Errorf("could not find column names in parquet metadata")
}

primaryKeys := strings.Split(primaryKeyColumnsString, ",")
columns := strings.Split(columnsNamesString, ",")

columnNameSet := make(map[string]struct{})
primaryKeyColumnSet := make(map[string]struct{})

for _, key := range primaryKeys {
primaryKeyColumnSet[key] = struct{}{}
primaryKeysNamesOrdered, primaryKeyColumnSet, err := deserializeMap(primaryKeyColumnsString)
if err != nil {
return err
}

// Drop parquetCrdbEventTypeColName.
for _, key := range columns[:len(columns)-1] {
columnNameSet[key] = struct{}{}
valueColumnNamesOrdered, columnNameSet, err := deserializeMap(columnsNamesString)
if err != nil {
return err
}

for _, row := range datums {
isDeleted := false

// Remove parquetCrdbEventTypeColName from the column names list. Values
// for this column will still be present in datum rows.
rowCopy := make([]string, len(columns)-1)
copy(rowCopy, columns[:len(columns)-1])
rowCopy := make([]string, len(valueColumnNamesOrdered)-1)
copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1])
rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy)
if err != nil {
return err
}

keyJSONBuilder := json.NewArrayBuilder(len(primaryKeys))
for colIdx, v := range row {
k := columns[colIdx]
if k == parquetCrdbEventTypeColName {
if *v.(*tree.DString) == *parquetEventDelete.DString() {
isDeleted = true
}
continue
}
keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered))

if _, ok := columnNameSet[k]; ok {
j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
if err := rowJSONBuilder.Set(k, j); err != nil {
return err
}
isDeleted := false

for _, primaryKeyColumnName := range primaryKeysNamesOrdered {
datum := row[primaryKeyColumnSet[primaryKeyColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
keyJSONBuilder.Add(j)

if _, ok := primaryKeyColumnSet[k]; ok {
j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
for _, valueColumnName := range valueColumnNamesOrdered {
if valueColumnName == parquetCrdbEventTypeColName {
if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() {
isDeleted = true
}
keyJSONBuilder.Add(j)
break
}
datum := row[columnNameSet[valueColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
if err := rowJSONBuilder.Set(valueColumnName, j); err != nil {
return err
}
}

Expand Down

0 comments on commit 875cbfa

Please sign in to comment.