Skip to content

Commit

Permalink
util/parquet: add support for arrays
Browse files Browse the repository at this point in the history
This change extends and refactors the util/parquet library to
be able to read and write arrays.

Release note: None

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
  • Loading branch information
jayshrivastava committed Apr 19, 2023
1 parent 3df495f commit adf8c1a
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 126 deletions.
95 changes: 66 additions & 29 deletions pkg/util/parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/lib/pq/oid"
)

// Setting parquet.Repetitions.Optional makes parquet a column nullable. When
// writing a datum, we will always specify a definition level to indicate if the
// datum is null or not. See comments on nonNilDefLevel or nilDefLevel for more info.
var defaultRepetitions = parquet.Repetitions.Optional

// A schema field is an internal identifier for schema nodes used by the parquet library.
// A value of -1 will let the library auto-assign values. This does not affect reading
// or writing parquet files.
Expand All @@ -35,10 +40,11 @@ const defaultTypeLength = -1

// A column stores column metadata.
type column struct {
node schema.Node
colWriter writeFn
decoder decoder
typ *types.T
node schema.Node
writeInvoker writeInvoker
writeFn writeFn
decoder decoder
typ *types.T
}

// A SchemaDefinition stores a parquet schema.
Expand Down Expand Up @@ -67,7 +73,7 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition,
fields := make([]schema.Node, 0)

for i := 0; i < len(columnNames); i++ {
parquetCol, err := makeColumn(columnNames[i], columnTypes[i])
parquetCol, err := makeColumn(columnNames[i], columnTypes[i], defaultRepetitions)
if err != nil {
return nil, err
}
Expand All @@ -87,50 +93,48 @@ func NewSchema(columnNames []string, columnTypes []*types.T) (*SchemaDefinition,
}

// makeColumn constructs a column.
func makeColumn(colName string, typ *types.T) (column, error) {
// Setting parquet.Repetitions.Optional makes parquet interpret all columns as nullable.
// When writing data, we will specify a definition level of 0 (null) or 1 (not null).
// See https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet
// for more information regarding definition levels.
defaultRepetitions := parquet.Repetitions.Optional

func makeColumn(colName string, typ *types.T, repetitions parquet.Repetition) (column, error) {
result := column{typ: typ}
var err error
switch typ.Family() {
case types.BoolFamily:
result.node = schema.NewBooleanNode(colName, defaultRepetitions, defaultSchemaFieldID)
result.colWriter = writeBool
result.node = schema.NewBooleanNode(colName, repetitions, defaultSchemaFieldID)
result.writeInvoker = writeScalar
result.writeFn = writeBool
result.decoder = boolDecoder{}
result.typ = types.Bool
return result, nil
case types.StringFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
defaultTypeLength, defaultSchemaFieldID)

if err != nil {
return result, err
}
result.colWriter = writeString
result.writeInvoker = writeScalar
result.writeFn = writeString
result.decoder = stringDecoder{}
return result, nil
case types.IntFamily:
// Note: integer datums are always signed: https://www.cockroachlabs.com/docs/stable/int.html
if typ.Oid() == oid.T_int8 {
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.NewIntLogicalType(64, true),
repetitions, schema.NewIntLogicalType(64, true),
parquet.Types.Int64, defaultTypeLength,
defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeInt64
result.writeInvoker = writeScalar
result.writeFn = writeInt64
result.decoder = int64Decoder{}
return result, nil
}

result.node = schema.NewInt32Node(colName, defaultRepetitions, defaultSchemaFieldID)
result.colWriter = writeInt32
result.node = schema.NewInt32Node(colName, repetitions, defaultSchemaFieldID)
result.writeInvoker = writeScalar
result.writeFn = writeInt32
result.decoder = int32Decoder{}
return result, nil
case types.DecimalFamily:
Expand All @@ -149,37 +153,71 @@ func makeColumn(colName string, typ *types.T) (column, error) {
}

result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.NewDecimalLogicalType(precision,
repetitions, schema.NewDecimalLogicalType(precision,
scale), parquet.Types.ByteArray, defaultTypeLength,
defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeDecimal
result.writeInvoker = writeScalar
result.writeFn = writeDecimal
result.decoder = decimalDecoder{}
return result, nil
case types.UuidFamily:
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.UUIDLogicalType{},
repetitions, schema.UUIDLogicalType{},
parquet.Types.FixedLenByteArray, uuid.Size, defaultSchemaFieldID)
if err != nil {
return result, err
}
result.colWriter = writeUUID
result.writeInvoker = writeScalar
result.writeFn = writeUUID
result.decoder = uUIDDecoder{}
return result, nil
case types.TimestampFamily:
// Note that all timestamp datums are in UTC: https://www.cockroachlabs.com/docs/stable/timestamp.html
result.node, err = schema.NewPrimitiveNodeLogical(colName,
defaultRepetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
repetitions, schema.StringLogicalType{}, parquet.Types.ByteArray,
defaultTypeLength, defaultSchemaFieldID)
if err != nil {
return result, err
}

result.colWriter = writeTimestamp
result.writeInvoker = writeScalar
result.writeFn = writeTimestamp
result.decoder = timestampDecoder{}
return result, nil
case types.ArrayFamily:
// Arrays for type T are represented by the following:
// message schema { -- toplevel schema
// optional group a (LIST) { -- list column
// repeated group list {
// optional T element;
// }
// }
// }
// Representing arrays this way makes it easier to differentiate NULL, [NULL],
// and [] when encoding.
// There is more info about encoding arrays here:
// https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/
elementCol, err := makeColumn("element", typ.ArrayContents(), parquet.Repetitions.Optional)
if err != nil {
return result, err
}
innerListFields := []schema.Node{elementCol.node}
innerListNode, err := schema.NewGroupNode("list", parquet.Repetitions.Repeated, innerListFields, defaultSchemaFieldID)
if err != nil {
return result, err
}
outerListFields := []schema.Node{innerListNode}
result.node, err = schema.NewGroupNodeLogical(colName, parquet.Repetitions.Optional, outerListFields, schema.ListLogicalType{}, defaultSchemaFieldID)
if err != nil {
return result, err
}
result.decoder = elementCol.decoder
result.writeInvoker = writeArray
result.writeFn = elementCol.writeFn
result.typ = elementCol.typ
return result, nil

// TODO(#99028): implement support for the remaining types.
// case types.INetFamily:
Expand All @@ -196,8 +234,7 @@ func makeColumn(colName string, typ *types.T) (column, error) {
// case types.TimeTZFamily:
// case types.IntervalFamily:
// case types.TimestampTZFamily:
// case types.ArrayFamily:
default:
return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet export does not support the %v type yet", typ.Family())
return result, pgerror.Newf(pgcode.FeatureNotSupported, "parquet export does not support the %v type", typ.Family())
}
}
Loading

0 comments on commit adf8c1a

Please sign in to comment.