Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "do not merge: always force parquet format in cdc tests" #103928

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6088,6 +6088,7 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
// Filter out draining nodes; normally we rely on dist sql planner
// to do that for us.
FilterDrainingNodes: func(
Expand Down
121 changes: 101 additions & 20 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package changefeedccl

import (
"bytes"
"io"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

type parquetWriter struct {
Expand All @@ -26,23 +29,22 @@ type parquetWriter struct {
// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
numCols := len(row.ResultColumns()) + 1

columnNames := make([]string, numCols)
columnTypes := make([]*types.T, numCols)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
columnNames[idx] = col.Name
columnTypes[idx] = col.Typ
idx += 1
var columnNames []string
var columnTypes []*types.T

numCols := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
columnNames = append(columnNames, col.Name)
columnTypes = append(columnTypes, col.Typ)
numCols += 1
return nil
}); err != nil {
return nil, 0, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String
columnNames = append(columnNames, parquetCrdbEventTypeColName)
columnTypes = append(columnTypes, types.String)
numCols += 1

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
Expand Down Expand Up @@ -102,7 +104,7 @@ func (w *parquetWriter) close() error {
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
datums := datumAlloc[:0]

if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
return nil
}); err != nil {
Expand All @@ -114,24 +116,103 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []

// addParquetTestMetadata appends options to the provided options to configure the
// parquet writer to write metadata required by cdc test feed factories.
//
// Generally, cdc tests will convert the row to JSON loosely in the form:
// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value
// columns in a JSON object. The metadata generated by this function contains
// key and value column names along with their offsets in the parquet file.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
keyCols := make([]string, 0)
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
// ForEachKeyColumn may not match. In tests, we want to use the latter
// order when printing the keys.
keyCols := map[string]int{}
var keysInOrder []string
if err := row.ForEachKeyColumn().Col(func(col cdcevent.ResultColumn) error {
keyCols = append(keyCols, col.Name)
keyCols[col.Name] = -1
keysInOrder = append(keysInOrder, col.Name)
return nil
}); err != nil {
return opts, err
}
opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": strings.Join(keyCols, ",")}))

allCols := make([]string, 0)
// NB: We do not use ForAllColumns here because it will always contain the
// key. In tests where we target a column family without a key in it,
// ForEachColumn will exclude the primary key of the table, which is what
// we want.
valueCols := map[string]int{}
var valuesInOrder []string
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
allCols = append(allCols, col.Name)
valueCols[col.Name] = -1
valuesInOrder = append(valuesInOrder, col.Name)
return nil
}); err != nil {
return opts, err
}

// Iterate over ForAllColumns to determine the offets of each column
// in a parquet row (ie. the slice of datums provided to addData). We don't
// do this above because there is no way to determine it from
// cdcevent.ResultColumn. The Ordinal() method may return an invalid
// number for virtual columns.
idx := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, colIsInKey := keyCols[col.Name]; colIsInKey {
keyCols[col.Name] = idx
}
if _, colIsInValue := valueCols[col.Name]; colIsInValue {
valueCols[col.Name] = idx
}
idx += 1
return nil
}); err != nil {
return opts, err
}
allCols = append(allCols, parquetCrdbEventTypeColName)
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": strings.Join(allCols, ",")}))
valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName)
valueCols[parquetCrdbEventTypeColName] = idx
idx += 1

opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return opts, nil
}

// serializeMap serializes a map to a string. For example, orderedKeys=["b",
// "a"] m={"a": 1", "b": 2, "c":3} will return the string "b,2,a,1".
func serializeMap(orderedKeys []string, m map[string]int) string {
var buf bytes.Buffer
for i, k := range orderedKeys {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString(",")
buf.WriteString(strconv.Itoa(m[k]))
}
return buf.String()
}

// deserializeMap deserializes a string in the form "b,2,a,1" and returns a map
// representation along with the keys in order: m={"a": 1", "b": 2}
// orderedKeys=["b", "a"].
func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error) {
keyValues := strings.Split(s, ",")
if len(keyValues)%2 != 0 {
return nil, nil,
errors.AssertionFailedf("list of elements %s should have an even length", s)
}
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value, err := strconv.Atoi(keyValues[i+1])
if err != nil {
return nil, nil, err
}
orderedKeys = append(orderedKeys, key)
if i == 0 {
m = map[string]int{}
}
m[key] = value
}
return orderedKeys, m, nil
}
75 changes: 32 additions & 43 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,13 +1053,11 @@ func (f *cloudFeedFactory) Feed(
// being created with incompatible options. If it can be enabled, we will use
// parquet format with a probability of 0.4.
parquetPossible := true

explicitEnvelope := false
for _, opt := range createStmt.Options {
if string(opt.Key) == changefeedbase.OptEnvelope {
explicitEnvelope = true
}

if string(opt.Key) == changefeedbase.OptFormat &&
opt.Value.String() != string(changefeedbase.OptFormatParquet) {
parquetPossible = false
Expand All @@ -1073,7 +1071,7 @@ func (f *cloudFeedFactory) Feed(
}
}
randNum := rand.Intn(5)
if randNum < 5 {
if randNum < 2 {
parquetPossible = false
}
if parquetPossible {
Expand Down Expand Up @@ -1238,59 +1236,50 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
return errors.Errorf("could not find column names in parquet metadata")
}

primaryKeys := strings.Split(primaryKeyColumnsString, ",")
columns := strings.Split(columnsNamesString, ",")

columnNameSet := make(map[string]struct{})
primaryKeyColumnSet := make(map[string]struct{})

for _, key := range primaryKeys {
primaryKeyColumnSet[key] = struct{}{}
primaryKeysNamesOrdered, primaryKeyColumnSet, err := deserializeMap(primaryKeyColumnsString)
if err != nil {
return err
}

// Drop parquetCrdbEventTypeColName.
for _, key := range columns[:len(columns)-1] {
columnNameSet[key] = struct{}{}
valueColumnNamesOrdered, columnNameSet, err := deserializeMap(columnsNamesString)
if err != nil {
return err
}

for _, row := range datums {
isDeleted := false

// Remove parquetCrdbEventTypeColName from the column names list. Values
// for this column will still be present in datum rows.
rowCopy := make([]string, len(columns)-1)
copy(rowCopy, columns[:len(columns)-1])
rowCopy := make([]string, len(valueColumnNamesOrdered)-1)
copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1])
rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy)
if err != nil {
return err
}

keyJSONBuilder := json.NewArrayBuilder(len(primaryKeys))
for colIdx, v := range row {
k := columns[colIdx]
if k == parquetCrdbEventTypeColName {
if *v.(*tree.DString) == *parquetEventDelete.DString() {
isDeleted = true
}
continue
}
keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered))

if _, ok := columnNameSet[k]; ok {
j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
if err := rowJSONBuilder.Set(k, j); err != nil {
return err
}
isDeleted := false

for _, primaryKeyColumnName := range primaryKeysNamesOrdered {
datum := row[primaryKeyColumnSet[primaryKeyColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
keyJSONBuilder.Add(j)

if _, ok := primaryKeyColumnSet[k]; ok {
j, err := tree.AsJSON(v, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
for _, valueColumnName := range valueColumnNamesOrdered {
if valueColumnName == parquetCrdbEventTypeColName {
if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() {
isDeleted = true
}
keyJSONBuilder.Add(j)
break
}
datum := row[columnNameSet[valueColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
if err != nil {
return err
}
if err := rowJSONBuilder.Set(valueColumnName, j); err != nil {
return err
}
}

Expand Down