Skip to content

Commit

Permalink
util/parquet: support tuple labels in util/parquet testutils
Browse files Browse the repository at this point in the history
Previously, the test utilities in `util/parquet` would not reconstruct
tuples read from files with their labels. This change updates the
package to do so. This is required for testing in users of this
package such as CDC.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 8dde1c4 commit cf44c1d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
50 changes: 37 additions & 13 deletions pkg/util/parquet/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,28 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu
return ReadDatumsMetadata{}, nil, err
}

var colNames []string
startingRowIdx := 0
for rg := 0; rg < reader.NumRowGroups(); rg++ {
rgr := reader.RowGroup(rg)
rowsInRowGroup := rgr.NumRows()
for i := int64(0); i < rowsInRowGroup; i++ {
readDatums = append(readDatums, make([]tree.Datum, rgr.NumColumns()))
}
if rg == 0 {
colNames = make([]string, 0, rgr.NumColumns())
}

for colIdx := 0; colIdx < rgr.NumColumns(); colIdx++ {
col, err := rgr.Column(colIdx)
if err != nil {
return ReadDatumsMetadata{}, nil, err
}

if rg == 0 {
colNames = append(colNames, col.Descriptor().Name())
}

dec, err := decoderFromFamilyAndType(oid.Oid(typOids[colIdx]), types.Family(typFamilies[colIdx]))
if err != nil {
return ReadDatumsMetadata{}, nil, err
Expand All @@ -154,7 +162,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu
}

for i := 0; i < len(readDatums); i++ {
readDatums[i] = squashTuples(readDatums[i], tupleColumns)
readDatums[i] = squashTuples(readDatums[i], tupleColumns, colNames)
}

return makeDatumMeta(reader, readDatums), readDatums, nil
Expand Down Expand Up @@ -411,11 +419,14 @@ func ValidateDatum(t *testing.T, expected tree.Datum, actual tree.Datum) {
ValidateDatum(t, arr1[i], arr2[i])
}
case types.TupleFamily:
arr1 := expected.(*tree.DTuple).D
arr2 := actual.(*tree.DTuple).D
require.Equal(t, len(arr1), len(arr2))
for i := 0; i < len(arr1); i++ {
ValidateDatum(t, arr1[i], arr2[i])
t1 := expected.(*tree.DTuple)
t2 := actual.(*tree.DTuple)
require.Equal(t, len(t1.D), len(t2.D))
for i := 0; i < len(t1.D); i++ {
ValidateDatum(t, t1.D[i], t2.D[i])
}
if t1.ResolvedType().TupleLabels() != nil {
require.Equal(t, t1.ResolvedType().TupleLabels(), t2.ResolvedType().TupleLabels())
}
case types.EnumFamily:
require.Equal(t, expected.(*tree.DEnum).LogicalRep, actual.(*tree.DEnum).LogicalRep)
Expand Down Expand Up @@ -544,21 +555,33 @@ type dNullTupleType struct {
}

// squashTuples takes an array of datums and merges groups of adjacent datums
// into tuples using the passed intervals. Example:
// into tuples using the supplied intervals. The provided column names will be
// used as tuple labels.
//
// For example:
//
// Input:
//
// Input: ["0", "1", "2", "3", "4", "5", "6"] [[0, 1], [3, 3], [4, 6]]
// Output: [("0", "1"), "2", ("3"), ("4", "5", "6")]
// datumRow = ["0", "1", "2", "3", "4", "5", "6"]
// tupleColIndexes = [[0, 1], [3, 3], [4, 6]]
// colNames = ["a", "b", "c", "d", "e", "f", "g"]
//
// Output:
//
// [(a: "0", b: "1"), "2", (d: "3"), (e: "4", f: "5", g: "6")]
//
// Behavior is undefined if the intervals are not sorted, not disjoint,
// not ascending, or out of bounds.
func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum {
// not ascending, or out of bounds. The number of elements in datumRow
// should be equal to the number of labels in colNames.
func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int, colNames []string) []tree.Datum {
if len(tupleColIndexes) == 0 {
return datumRow
}
tupleIdx := 0
var updatedDatums []tree.Datum
var currentTupleDatums []tree.Datum
var currentTupleTypes []*types.T
var currentTupleLabels []string
for i, d := range datumRow {
if tupleIdx < len(tupleColIndexes) {
tupleUpperIdx := tupleColIndexes[tupleIdx][1]
Expand All @@ -567,18 +590,19 @@ func squashTuples(datumRow []tree.Datum, tupleColIndexes [][]int) []tree.Datum {
if i >= tupleLowerIdx && i <= tupleUpperIdx {
currentTupleDatums = append(currentTupleDatums, d)
currentTupleTypes = append(currentTupleTypes, d.ResolvedType())

currentTupleLabels = append(currentTupleLabels, colNames[i])
if i == tupleUpperIdx {
// Check for marker that indicates the entire tuple is NULL.
if currentTupleDatums[0] == dNullTuple {
updatedDatums = append(updatedDatums, tree.DNull)
} else {
tupleDatum := tree.MakeDTuple(types.MakeTuple(currentTupleTypes), currentTupleDatums...)
tupleDatum := tree.MakeDTuple(types.MakeLabeledTuple(currentTupleTypes, currentTupleLabels), currentTupleDatums...)
updatedDatums = append(updatedDatums, &tupleDatum)
}

currentTupleTypes = []*types.T{}
currentTupleDatums = []tree.Datum{}
currentTupleLabels = []string{}

tupleIdx += 1
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/util/parquet/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *fil
err = reader.Close()
require.NoError(t, err)
}

func TestSquashTuples(t *testing.T) {
datums := []tree.Datum{
tree.NewDInt(1),
Expand All @@ -634,6 +635,7 @@ func TestSquashTuples(t *testing.T) {
tree.NewDJSON(json.FromBool(false)),
tree.NewDInt(0),
}
labels := []string{"a", "b", "c", "d", "e", "f", "g", "h"}

for _, tc := range []struct {
tupleIntervals [][]int
Expand All @@ -645,18 +647,18 @@ func TestSquashTuples(t *testing.T) {
},
{
tupleIntervals: [][]int{{0, 1}, {2, 4}},
tupleOutput: "[(1, 'string') ('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') 0.1 'false' 0]",
tupleOutput: "[((1, 'string') AS a, b) (('\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1') AS c, d, e) 0.1 'false' 0]",
},
{
tupleIntervals: [][]int{{0, 2}, {3, 3}},
tupleOutput: "[(1, 'string', '\\x6279746573') ('52fdfc07-2182-454f-963f-5f0f9a621d72',) '1' 0.1 'false' 0]",
tupleOutput: "[((1, 'string', '\\x6279746573') AS a, b, c) (('52fdfc07-2182-454f-963f-5f0f9a621d72',) AS d) '1' 0.1 'false' 0]",
},
{
tupleIntervals: [][]int{{0, 7}},
tupleOutput: "[(1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0)]",
tupleOutput: "[((1, 'string', '\\x6279746573', '52fdfc07-2182-454f-963f-5f0f9a621d72', '1', 0.1, 'false', 0) AS a, b, c, d, e, f, g, h)]",
},
} {
squashedDatums := squashTuples(datums, tc.tupleIntervals)
squashedDatums := squashTuples(datums, tc.tupleIntervals, labels)
require.Equal(t, tc.tupleOutput, fmt.Sprint(squashedDatums))
}
}

0 comments on commit cf44c1d

Please sign in to comment.