diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index c180f3344f40..e7dc09306fc6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -303,6 +303,7 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/timeutil/pgdate", + "//pkg/util/uuid", "//pkg/workload/bank", "//pkg/workload/ledger", "//pkg/workload/workloadsql", diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index bb1612fcd2fb..93b0e498693b 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -154,7 +154,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er } } - foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff`) + withFormatParquet := "" + if rand.Intn(2) < 2 { + withFormatParquet = ", format=parquet" + } + foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet)) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 18f32670fd1f..8bed298bcdb1 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -270,11 +270,7 @@ func TestChangefeedBasicQuery(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) - // Currently, parquet format (which may be injected by feed() call, doesn't - // know how to handle tuple types (cdc_prev); so, force JSON format. - foo := feed(t, f, ` -CREATE CHANGEFEED WITH format='json' -AS SELECT *, event_op() AS op, cdc_prev FROM foo`) + foo := feed(t, f, `CREATE CHANGEFEED AS SELECT *, event_op() AS op, cdc_prev FROM foo`) defer closeFeed(t, foo) // 'initial' is skipped because only the latest value ('updated') is @@ -319,7 +315,7 @@ func TestChangefeedBasicQueryWrapped(t *testing.T) { // Currently, parquet format (which may be injected by feed() call), doesn't // know how to handle tuple types (cdc_prev); so, force JSON format. foo := feed(t, f, ` -CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff +CREATE CHANGEFEED WITH envelope='wrapped', format='parquet', diff AS SELECT b||a AS ba, event_op() AS op FROM foo`) defer closeFeed(t, foo) @@ -348,7 +344,7 @@ AS SELECT b||a AS ba, event_op() AS op FROM foo`) }) } - cdcTest(t, testFn, feedTestForceSink("webhook")) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } // Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO. @@ -582,7 +578,7 @@ func TestChangefeedDiff(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH diff, format=parquet`) defer closeFeed(t, foo) // 'initial' is skipped because only the latest value ('updated') is @@ -614,7 +610,7 @@ func TestChangefeedDiff(t *testing.T) { }) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } func TestChangefeedTenants(t *testing.T) { @@ -7215,7 +7211,7 @@ func TestChangefeedEndTime(t *testing.T) { sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)") fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime) + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1, format=parquet", fakeEndTime) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ @@ -7232,7 +7228,7 @@ func TestChangefeedEndTime(t *testing.T) { })) } - cdcTest(t, testFn, feedTestEnterpriseSinks) + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } func TestChangefeedEndTimeWithCursor(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 97ee75b2348e..1e2a8a4d8b40 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -438,12 +438,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // ParquetFormatUnsupportedOptions is options that are not supported with the // parquet format. -// -// OptKeyInValue is disallowed because parquet files have no concept of key -// columns, so there is no reason to emit duplicate key datums. -// -// TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/changefeedbase/options_test.go b/pkg/ccl/changefeedccl/changefeedbase/options_test.go index 4eb5c62a0dbc..a5e8eb342e09 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options_test.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options_test.go @@ -33,9 +33,9 @@ func TestOptionsValidations(t *testing.T) { }{ {map[string]string{"format": "txt"}, false, "unknown format"}, {map[string]string{"initial_scan": "", "no_initial_scan": ""}, false, "cannot specify both"}, - {map[string]string{"diff": "", "format": "parquet"}, false, "cannot specify both"}, {map[string]string{"format": "txt"}, true, "unknown format"}, {map[string]string{"initial_scan": "", "no_initial_scan": ""}, true, "cannot specify both"}, + {map[string]string{"format": "parquet", "topic_in_value": ""}, false, "cannot specify both"}, // Verify that the returned error uses the syntax initial_scan='yes' instead of initial_scan_only. See #97008. {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, {map[string]string{"initial_scan_only": "", "resolved": ""}, true, "cannot specify both initial_scan='only'"}, diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 194f4011e234..857dfa3fd1c7 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -858,6 +858,9 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { sinkWeights[sinkType] = 0 } } + if weight, ok := sinkWeights["cloudstorage"]; ok && weight != 0 { + sinkWeights = map[string]int{"cloudstorage": 1} + } weightTotal := 0 for _, weight := range sinkWeights { weightTotal += weight diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 9e50c8360adb..95d5e16a51be 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -13,13 +13,16 @@ import ( "io" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) @@ -33,6 +36,9 @@ type parquetWriter struct { encodingOpts changefeedbase.EncodingOptions schemaDef *parquet.SchemaDefinition datumAlloc []tree.Datum + + // Cached object builder for when using the `diff` option. + vb *json.FixedKeysObjectBuilder } // newParquetSchemaDefintion returns a parquet schema definition based on the @@ -68,6 +74,7 @@ func newParquetSchemaDefintion( const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps +const parquetOptDiffColName = metaSentinel + "before" func appendMetadataColsToSchema( columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions, @@ -80,6 +87,10 @@ func appendMetadataColsToSchema( columnNames = append(columnNames, parquetOptMVCCTimestampColName) columnTypes = append(columnTypes, types.String) } + if encodingOpts.Diff { + columnNames = append(columnNames, parquetOptDiffColName) + columnTypes = append(columnTypes, types.Json) + } return columnNames, columnTypes } @@ -118,7 +129,7 @@ func newParquetWriterFromRow( func (w *parquetWriter) addData( updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, ) error { - if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil { + if err := w.populateDatums(updatedRow, prevRow, updated, mvcc); err != nil { return err } @@ -131,14 +142,10 @@ func (w *parquetWriter) close() error { } // populateDatums writes the appropriate datums into the datumAlloc slice. -func populateDatums( - updatedRow cdcevent.Row, - prevRow cdcevent.Row, - encodingOpts changefeedbase.EncodingOptions, - updated, mvcc hlc.Timestamp, - datumAlloc []tree.Datum, +func (w *parquetWriter) populateDatums( + updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, ) error { - datums := datumAlloc[:0] + datums := w.datumAlloc[:0] if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { datums = append(datums, d) @@ -148,12 +155,47 @@ func populateDatums( } datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) - if encodingOpts.UpdatedTimestamps { + if w.encodingOpts.UpdatedTimestamps { datums = append(datums, tree.NewDString(timestampToString(updated))) } - if encodingOpts.MVCCTimestamps { + if w.encodingOpts.MVCCTimestamps { datums = append(datums, tree.NewDString(timestampToString(mvcc))) } + if w.encodingOpts.Diff { + if prevRow.IsDeleted() { + datums = append(datums, tree.DNull) + } else { + if w.vb == nil { + keys := make([]string, 0, len(prevRow.ResultColumns())) + _ = prevRow.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { + keys = append(keys, col.Name) + return nil + }) + valueBuilder, err := json.NewFixedKeysObjectBuilder(keys) + if err != nil { + return err + } + w.vb = valueBuilder + } + + if err := prevRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + j, err := tree.AsJSON(d, sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + return w.vb.Set(col.Name, j) + }); err != nil { + return err + } + + j, err := w.vb.Build() + if err != nil { + return err + } + datums = append(datums, tree.NewDJSON(j)) + } + } + return nil } @@ -228,6 +270,11 @@ func addParquetTestMetadata( valueCols[parquetOptMVCCTimestampColName] = idx idx += 1 } + if encodingOpts.Diff { + valuesInOrder = append(valuesInOrder, parquetOptDiffColName) + valueCols[parquetOptDiffColName] = idx + idx += 1 + } parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index a55fd3ca8839..4f970523ce58 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -54,29 +55,39 @@ func TestParquetRows(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) for _, tc := range []struct { - testName string - createTable string - inserts []string + testName string + createTable string + stmts []string + expectedDatumRows [][]tree.Datum }{ { testName: "mixed", createTable: `CREATE TABLE foo ( int32Col INT4 PRIMARY KEY, - varCharCol VARCHAR(16) , - charCol CHAR(2), - tsCol TIMESTAMP , stringCol STRING , - decimalCOl DECIMAL(12,2), uuidCol UUID )`, - inserts: []string{ - `INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`, - `INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`, - `INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`, - `INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, - `INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`, - `INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, - `INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`, + stmts: []string{ + `INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`, + `INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`, + `INSERT INTO foo VALUES (2, 'c1', '5a02bd48-ba64-4134-9199-844c1517f722')`, + `UPDATE foo SET stringCol = 'changed' WHERE int32Col = 1`, + `DELETE FROM foo WHERE int32Col = 0`, + }, + expectedDatumRows: [][]tree.Datum{ + {tree.NewDInt(0), tree.NewDString("a1"), + &tree.DUuid{uuid.FromStringOrNil("2fec7a4b-0a78-40ce-92e0-d1c0fac70436")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(1), tree.NewDString("b1"), + &tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(2), tree.NewDString("c1"), + &tree.DUuid{uuid.FromStringOrNil("5a02bd48-ba64-4134-9199-844c1517f722")}, + parquetEventTypeDatumStringMap[parquetEventInsert]}, + {tree.NewDInt(1), tree.NewDString("changed"), + &tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")}, + parquetEventTypeDatumStringMap[parquetEventUpdate]}, + {tree.NewDInt(0), tree.DNull, tree.DNull, parquetEventTypeDatumStringMap[parquetEventDelete]}, }, }, } { @@ -104,8 +115,8 @@ func TestParquetRows(t *testing.T) { } }() - numRows := len(tc.inserts) - for _, insertStmt := range tc.inserts { + numRows := len(tc.stmts) + for _, insertStmt := range tc.stmts { sqlDB.Exec(t, insertStmt) } @@ -135,11 +146,7 @@ func TestParquetRows(t *testing.T) { err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{}) require.NoError(t, err) - // Save a copy of the datums we wrote. - datumRow := make([]tree.Datum, writer.schemaDef.NumColumns()) - err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow) - require.NoError(t, err) - datums[i] = datumRow + datums[i] = tc.expectedDatumRows[i] } err = writer.close() diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index c5df6e32538d..42b8d8ce42f9 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1050,41 +1050,46 @@ func (f *cloudFeedFactory) Feed( tree.KVOption{Key: changefeedbase.OptKeyInValue}, ) } - // Determine if we can enable the parquet format if the changefeed is not - // being created with incompatible options. If it can be enabled, we will use - // parquet format with a probability of 0.4. - parquetPossible := includeParquestTestMetadata + + formatSpecified := false explicitEnvelope := false for _, opt := range createStmt.Options { + if string(opt.Key) == changefeedbase.OptFormat { + formatSpecified = true + } if string(opt.Key) == changefeedbase.OptEnvelope { explicitEnvelope = true } - if string(opt.Key) == changefeedbase.OptFormat && - opt.Value.String() != string(changefeedbase.OptFormatParquet) { + } + + if !formatSpecified { + // Determine if we can enable the parquet format if the changefeed is not + // being created with incompatible options. If it can be enabled, we will use + // parquet format with a probability of 0.4. + parquetPossible := includeParquestTestMetadata + for _, opt := range createStmt.Options { + for o := range changefeedbase.ParquetFormatUnsupportedOptions { + if o == string(opt.Key) { + parquetPossible = false + break + } + } + } + randNum := rand.Intn(5) + if randNum < 0 { parquetPossible = false - break } - for o := range changefeedbase.ParquetFormatUnsupportedOptions { - if o == string(opt.Key) { - parquetPossible = false - break - } + if parquetPossible { + log.Infof(context.Background(), "using parquet format") + createStmt.Options = append( + createStmt.Options, + tree.KVOption{ + Key: changefeedbase.OptFormat, + Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)), + }, + ) } } - randNum := rand.Intn(5) - if randNum < 2 { - parquetPossible = false - } - if parquetPossible { - log.Infof(context.Background(), "using parquet format") - createStmt.Options = append( - createStmt.Options, - tree.KVOption{ - Key: changefeedbase.OptFormat, - Value: tree.NewStrVal(string(changefeedbase.OptFormatParquet)), - }, - ) - } feedDir := feedSubDir() sinkURI := `nodelocal://1/` + feedDir @@ -1257,6 +1262,8 @@ func (c *cloudFeed) appendParquetTestFeedMessages( metaColumnNameSet[colName] = colIdx case parquetOptMVCCTimestampColName: metaColumnNameSet[colName] = colIdx + case parquetOptDiffColName: + metaColumnNameSet[colName] = colIdx default: } } @@ -1326,6 +1333,13 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j) } + if mvccColIdx, mvcc := metaColumnNameSet[parquetOptDiffColName]; mvcc { + j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { + return err + } + valueWithAfter.Add("before", j) + } } keyJSON := keyJSONBuilder.Build() diff --git a/pkg/util/parquet/testutils.go b/pkg/util/parquet/testutils.go index bbc7c38d4489..d6f88bfc10ad 100644 --- a/pkg/util/parquet/testutils.go +++ b/pkg/util/parquet/testutils.go @@ -119,6 +119,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu return ReadDatumsMetadata{}, nil, err } + var colNames []string startingRowIdx := 0 for rg := 0; rg < reader.NumRowGroups(); rg++ { rgr := reader.RowGroup(rg) @@ -126,6 +127,9 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu for i := int64(0); i < rowsInRowGroup; i++ { readDatums = append(readDatums, make([]tree.Datum, rgr.NumColumns())) } + if rg == 0 { + colNames = make([]string, 0, rgr.NumColumns()) + } for colIdx := 0; colIdx < rgr.NumColumns(); colIdx++ { col, err := rgr.Column(colIdx) @@ -133,6 +137,10 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu return ReadDatumsMetadata{}, nil, err } + if rg == 0 { + colNames = append(colNames, col.Descriptor().Name()) + } + dec, err := decoderFromFamilyAndType(oid.Oid(typOids[colIdx]), types.Family(typFamilies[colIdx])) if err != nil { return ReadDatumsMetadata{}, nil, err @@ -154,7 +162,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu } for i := 0; i < len(readDatums); i++ { - readDatums[i] = squashTuples(readDatums[i], tupleColumns) + readDatums[i] = squashTuples(readDatums[i], tupleColumns, colNames) } return makeDatumMeta(reader, readDatums), readDatums, nil @@ -411,11 +419,14 @@ func ValidateDatum(t *testing.T, expected tree.Datum, actual tree.Datum) { ValidateDatum(t, arr1[i], arr2[i]) } case types.TupleFamily: - arr1 := expected.(*tree.DTuple).D - arr2 := actual.(*tree.DTuple).D - require.Equal(t, len(arr1), len(arr2)) - for i := 0; i < len(arr1); i++ { - ValidateDatum(t, arr1[i], arr2[i]) + t1 := expected.(*tree.DTuple) + t2 := actual.(*tree.DTuple) + require.Equal(t, len(t1.D), len(t2.D)) + for i := 0; i < len(t1.D); i++ { + ValidateDatum(t, t1.D[i], t2.D[i]) + } + if t1.ResolvedType().TupleLabels() != nil { + require.Equal(t, t1.ResolvedType().TupleLabels(), t2.ResolvedType().TupleLabels()) } case types.EnumFamily: require.Equal(t, expected.(*tree.DEnum).LogicalRep, actual.(*tree.DEnum).LogicalRep) @@ -544,14 +555,25 @@ type dNullTupleType struct { } // squashTuples takes an array of datums and merges groups of adjacent datums -// into tuples using the passed intervals. Example: +// into tuples using the supplied intervals. The provided column names will be +// used as tuple labels. +// +// For example: +// +// Input: // -// Input: ["0", "1", "2", "3", "4", "5", "6"] [[0, 1], [3, 3], [4, 6]] -// Output: [("0", "1"), "2", ("3"), ("4", "5", "6")] +// datumRow = ["0", "1", "2", "3", "4", "5", "6"] +// tupleColIndexes = [[0, 1], [3, 3], [4, 6]] +// colNames = ["a", "b", "c", "d", "e", "f", "g"] +// +// Output: +// +// [(a: "0", b: "1"), "2", (d: "3"), (e: "4", f: "5", g: "6")] // // Behavior is undefined if the intervals are not sorted, not disjoint, -// not ascending, or out of bounds. -func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { +// not ascending, or out of bounds. The number of elements in datumRow +// should be equal to the number of labels in colNames. +func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int, colNames []string) []tree.Datum { if len(tupleColIndexes) == 0 { return datumRow } @@ -559,6 +581,7 @@ func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { var updatedDatums []tree.Datum var currentTupleDatums []tree.Datum var currentTupleTypes []*types.T + var currentTupleLabels []string for i, d := range datumRow { if tupleIdx < len(tupleColIndexes) { tupleUpperIdx := tupleColIndexes[tupleIdx][1] @@ -567,18 +590,19 @@ func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum { if i >= tupleLowerIdx && i <= tupleUpperIdx { currentTupleDatums = append(currentTupleDatums, d) currentTupleTypes = append(currentTupleTypes, d.ResolvedType()) - + currentTupleLabels = append(currentTupleLabels, colNames[i]) if i == tupleUpperIdx { // Check for marker that indicates the entire tuple is NULL. if currentTupleDatums[0] == dNullTuple { updatedDatums = append(updatedDatums, tree.DNull) } else { - tupleDatum := tree.MakeDTuple(types.MakeTuple(currentTupleTypes), currentTupleDatums...) + tupleDatum := tree.MakeDTuple(types.MakeLabeledTuple(currentTupleTypes, currentTupleLabels), currentTupleDatums...) updatedDatums = append(updatedDatums, &tupleDatum) } currentTupleTypes = []*types.T{} currentTupleDatums = []tree.Datum{} + currentTupleLabels = []string{} tupleIdx += 1 } diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index bfcf7978cac6..7ef4ba569384 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -623,6 +623,7 @@ func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *fil err = reader.Close() require.NoError(t, err) } + func TestSquashTuples(t *testing.T) { datums := []tree.Datum{ tree.NewDInt(1), @@ -634,6 +635,7 @@ func TestSquashTuples(t *testing.T) { tree.NewDJSON(json.FromBool(false)), tree.NewDInt(0), } + labels := []string{"a", "b", "c", "d", "e", "f", "g", "h"} for _, tc := range []struct { tupleIntervals [][]int @@ -645,18 +647,18 @@ func TestSquashTuples(t *testing.T) { }, { tupleIntervals: [][]int{{0, 1}, {2, 4}}, - tupleOutput: "[(1, 'string') ('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') 0.1 'false' 0]", + tupleOutput: "[((1, 'string') AS a, b) (('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') AS c, d, e) 0.1 'false' 0]", }, { tupleIntervals: [][]int{{0, 2}, {3, 3}}, - tupleOutput: "[(1, 'string', '\\x6279746573') ('52fdfc07-2182-454f-963f-5f0f9a621d72',) '1' 0.1 'false' 0]", + tupleOutput: "[((1, 'string', '\\x6279746573') AS a, b, c) (('52fdfc07-2182-454f-963f-5f0f9a621d72',) AS d) '1' 0.1 'false' 0]", }, { tupleIntervals: [][]int{{0, 7}}, - tupleOutput: "[(1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0)]", + tupleOutput: "[((1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0) AS a, b, c, d, e, f, g, h)]", }, } { - squashedDatums := squashTuples(datums, tc.tupleIntervals) + squashedDatums := squashTuples(datums, tc.tupleIntervals, labels) require.Equal(t, tc.tupleOutput, fmt.Sprint(squashedDatums)) } }