Skip to content

Commit

Permalink
schema conversion fixes
Browse files Browse the repository at this point in the history
to match apacheGH-36760: [Go] Adding avro ocf reader - schema converter apache#36796
  • Loading branch information
loicalleyne committed Sep 6, 2023
1 parent bfa5b42 commit a7acd32
Showing 1 changed file with 66 additions and 50 deletions.
116 changes: 66 additions & 50 deletions go/arrow/avro/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,42 @@ import (
"strconv"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/decimal128"
"github.com/apache/arrow/go/v14/internal/types"
avro "github.com/hamba/avro/v2"
)

type schemaNode struct {
fieldname string
name string
parent *schemaNode
schema avro.Schema
union bool
childrens []*schemaNode
children []*schemaNode
arrowField arrow.Field
schemaCache *avro.SchemaCache
path []string
index, depth int32
}

var schemaCache avro.SchemaCache

func newSchemaNode() *schemaNode { return &schemaNode{fieldname: "", index: -1} }
func newSchemaNode() *schemaNode {
var schemaCache avro.SchemaCache
return &schemaNode{name: "", index: -1, schemaCache: &schemaCache}
}

func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode {
child := &schemaNode{
fieldname: n,
parent: node,
schema: s,
index: int32(len(node.childrens)),
depth: node.depth + 1}
node.childrens = append(node.childrens, child)
name: n,
parent: node,
schema: s,
schemaCache: node.schemaCache,
index: int32(len(node.children)),
depth: node.depth + 1}
node.children = append(node.children, child)
return child
}
func (node *schemaNode) children() []*schemaNode { return node.childrens }
func (node *schemaNode) Children() []*schemaNode { return node.children }

func (node *schemaNode) name() string { return node.fieldname }
func (node *schemaNode) Name() string { return node.name }

// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema
func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {
Expand All @@ -70,7 +74,7 @@ func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {
c := n.newChild(n.schema.(avro.NamedSchema).Name(), n.schema)
arrowSchemafromAvro(c)
var fields []arrow.Field
for _, g := range c.children() {
for _, g := range c.Children() {
fields = append(fields, g.arrowField)
}
s = arrow.NewSchema(fields, nil)
Expand All @@ -79,7 +83,7 @@ func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) {

func arrowSchemafromAvro(n *schemaNode) {
if ns, ok := n.schema.(avro.NamedSchema); ok {
schemaCache.Add(ns.Name(), ns)
n.schemaCache.Add(ns.Name(), ns)
}
switch st := n.schema.Type(); st {
case "record":
Expand All @@ -100,20 +104,20 @@ func arrowSchemafromAvro(n *schemaNode) {
case sl > math.MaxUint16 && sl <= math.MaxUint32:
dt.IndexType = arrow.PrimitiveTypes.Uint32
}
n.arrowField = arrow.Field{Name: n.name(), Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
n.arrowField = arrow.Field{Name: n.Name(), Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
case "array":
// logical items type
c := n.newChild(n.fieldname, n.schema.(*avro.ArraySchema).Items())
c := n.newChild(n.name, n.schema.(*avro.ArraySchema).Items())
if isLogicalSchemaType(n.schema.(*avro.ArraySchema).Items()) {
avroLogicalToArrowField(c)
} else {
arrowSchemafromAvro(c)
}
n.arrowField = arrow.Field{Name: n.fieldname, Type: arrow.ListOf(c.arrowField.Type)}
n.arrowField = arrow.Field{Name: n.name, Type: arrow.ListOf(c.arrowField.Type)}
case "map":
c := n.newChild(n.fieldname, n.schema.(*avro.MapSchema).Values())
c := n.newChild(n.name, n.schema.(*avro.MapSchema).Values())
arrowSchemafromAvro(c)
n.arrowField = arrow.Field{Name: n.fieldname, Type: arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), Metadata: c.arrowField.Metadata, Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), Metadata: c.arrowField.Metadata, Nullable: true}
case "union":
if n.schema.(*avro.UnionSchema).Nullable() {
if len(n.schema.(*avro.UnionSchema).Types()) > 1 {
Expand All @@ -127,43 +131,43 @@ func arrowSchemafromAvro(n *schemaNode) {
if isLogicalSchemaType(n.schema) {
avroLogicalToArrowField(n)
} else {
n.arrowField = arrow.Field{Name: n.name(), Type: &arrow.FixedSizeBinaryType{ByteWidth: n.schema.(*avro.FixedSchema).Size()}, Nullable: true}
n.arrowField = arrow.Field{Name: n.Name(), Type: &arrow.FixedSizeBinaryType{ByteWidth: n.schema.(*avro.FixedSchema).Size()}, Nullable: true}
}
case "string":
if isLogicalSchemaType(n.schema) {
avroLogicalToArrowField(n)
} else {
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
}
case "bytes":
if isLogicalSchemaType(n.schema) {
avroLogicalToArrowField(n)
} else {
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
}
case "int":
if isLogicalSchemaType(n.schema) {
avroLogicalToArrowField(n)
} else {
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
}
case "long":
if isLogicalSchemaType(n.schema) {
avroLogicalToArrowField(n)
} else {
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
}
case "float":
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
case "double":
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
case "boolean":
n.arrowField = arrow.Field{Name: n.fieldname, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: true}
case "<ref>":
n.schema = schemaCache.Get(n.schema.(*avro.RefSchema).Schema().Name())
n.schema = n.schemaCache.Get(n.schema.(*avro.RefSchema).Schema().Name())
arrowSchemafromAvro(n)
case "null":
n.arrowField = arrow.Field{Name: n.fieldname, Type: arrow.BinaryTypes.Binary, Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: arrow.Null, Nullable: true}
}
}

Expand All @@ -173,18 +177,18 @@ func iterateFields(n *schemaNode) {
switch ft := f.Type().(type) {
// Avro "array" field type
case *avro.ArraySchema:
schemaCache.Add(f.Name(), ft.Items())
n.schemaCache.Add(f.Name(), ft.Items())
// logical items type
c := n.newChild(f.Name(), ft.Items())
if isLogicalSchemaType(ft.Items()) {
avroLogicalToArrowField(c)
} else {
arrowSchemafromAvro(c)
}
c.arrowField = arrow.Field{Name: c.fieldname, Type: arrow.ListOf(c.arrowField.Type), Metadata: c.arrowField.Metadata, Nullable: true}
c.arrowField = arrow.Field{Name: c.name, Type: arrow.ListOf(c.arrowField.Type), Metadata: c.arrowField.Metadata, Nullable: true}
// Avro "enum" field type = Arrow dictionary type
case *avro.EnumSchema:
schemaCache.Add(f.Name(), f.Type())
n.schemaCache.Add(f.Name(), f.Type())
c := n.newChild(f.Name(), f.Type())
symbols := make(map[string]string)
for index, symbol := range ft.Symbols() {
Expand All @@ -204,7 +208,7 @@ func iterateFields(n *schemaNode) {
c.arrowField = arrow.Field{Name: f.Name(), Type: &dt, Nullable: true, Metadata: arrow.MetadataFrom(symbols)}
// Avro "fixed" field type = Arrow FixedSize Primitive BinaryType
case *avro.FixedSchema:
schemaCache.Add(f.Name(), f.Type())
n.schemaCache.Add(f.Name(), f.Type())
if isLogicalSchemaType(f.Type()) {
c := n.newChild(f.Name(), f.Type())
avroLogicalToArrowField(c)
Expand All @@ -213,12 +217,12 @@ func iterateFields(n *schemaNode) {
arrowSchemafromAvro(c)
}
case *avro.RecordSchema:
schemaCache.Add(f.Name(), f.Type())
n.schemaCache.Add(f.Name(), f.Type())
c := n.newChild(f.Name(), f.Type())
iterateFields(c)
// Avro "map" field type - KVP with value of one type - keys are strings
case *avro.MapSchema:
schemaCache.Add(f.Name(), ft.Values())
n.schemaCache.Add(f.Name(), ft.Values())
c := n.newChild(f.Name(), ft.Values())
arrowSchemafromAvro(c)
if ft.Values().Type() == "union" {
Expand All @@ -229,17 +233,16 @@ func iterateFields(n *schemaNode) {
case *avro.UnionSchema:
if ft.Nullable() {
if len(ft.Types()) > 1 {
schemaCache.Add(f.Name(), ft.Types()[1])
n.schemaCache.Add(f.Name(), ft.Types()[1])
c := n.newChild(f.Name(), ft.Types()[1])
c.union = true
arrowSchemafromAvro(c)
}
}
default:
schemaCache.Add(f.Name(), f.Type())
n.schemaCache.Add(f.Name(), f.Type())
if isLogicalSchemaType(f.Type()) {
c := n.newChild(f.Name(), f.Type())
// c.arrowField =
avroLogicalToArrowField(c)
} else {
c := n.newChild(f.Name(), f.Type())
Expand All @@ -249,15 +252,15 @@ func iterateFields(n *schemaNode) {
}
}
var fields []arrow.Field
for _, child := range n.children() {
for _, child := range n.Children() {
fields = append(fields, child.arrowField)
}

namedSchema, ok := isNamedSchema(n.schema)
if !ok || namedSchema == n.fieldname+"_data" || !n.union {
n.arrowField = arrow.Field{Name: n.fieldname, Type: arrow.StructOf(fields...), Nullable: true}
if !ok || namedSchema == n.name+"_data" || !n.union {
n.arrowField = arrow.Field{Name: n.name, Type: arrow.StructOf(fields...), Nullable: true}
} else {
n.arrowField = arrow.Field{Name: n.fieldname, Type: arrow.StructOf(fields...), Nullable: true, Metadata: arrow.MetadataFrom(map[string]string{"typeName": namedSchema})}
n.arrowField = arrow.Field{Name: n.name, Type: arrow.StructOf(fields...), Nullable: true, Metadata: arrow.MetadataFrom(map[string]string{"typeName": namedSchema})}
}

}
Expand Down Expand Up @@ -306,11 +309,8 @@ func avroPrimitiveToArrowType(avroFieldType string) arrow.DataType {
// string: unicode character sequence
case "string":
return arrow.BinaryTypes.String
// fallback to binary type for any unsupported type
// (shouldn't happen as all types specified in avro 1.11.1 are supported)
default:
return arrow.BinaryTypes.Binary
}
return nil
}

func avroLogicalToArrowField(n *schemaNode) arrow.Field {
Expand All @@ -327,7 +327,7 @@ func avroLogicalToArrowField(n *schemaNode) arrow.Field {
// precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required).
case "decimal":
id := arrow.DECIMAL128
if lt.(*avro.DecimalLogicalSchema).Precision() > 38 {
if lt.(*avro.DecimalLogicalSchema).Precision() > decimal128.MaxPrecision {
id = arrow.DECIMAL256
}
dt, _ = arrow.NewDecimalType(id, int32(lt.(*avro.DecimalLogicalSchema).Precision()), int32(lt.(*avro.DecimalLogicalSchema).Scale()))
Expand Down Expand Up @@ -378,6 +378,22 @@ func avroLogicalToArrowField(n *schemaNode) arrow.Field {
case "timestamp-micros":
dt = arrow.FixedWidthTypes.Timestamp_us

// The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of
// what specific time zone is considered local, with a precision of one millisecond.
// A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of
// milliseconds, from 1 January 1970 00:00:00.000.
// Note: not implemented in hamba/avro
// case "local-timestamp-millis":
// dt = &arrow.TimestampType{Unit: arrow.Millisecond}

// The local-timestamp-micros logical type represents a timestamp in a local timezone, regardless of
// what specific time zone is considered local, with a precision of one microsecond.
// A local-timestamp-micros logical type annotates an Avro long, where the long stores the number of
// microseconds, from 1 January 1970 00:00:00.000000.
// case "local-timestamp-micros":
// Note: not implemented in hamba/avro
// dt = &arrow.TimestampType{Unit: arrow.Microsecond}

// The duration logical type represents an amount of time defined by a number of months, days and milliseconds.
// This is not equivalent to a number of milliseconds, because, depending on the moment in time from which the
// duration is measured, the number of days in the month and number of milliseconds in a day may differ. Other
Expand All @@ -389,6 +405,6 @@ func avroLogicalToArrowField(n *schemaNode) arrow.Field {
case "duration":
dt = arrow.FixedWidthTypes.MonthDayNanoInterval
}
n.arrowField = arrow.Field{Name: n.fieldname, Type: dt, Nullable: true}
return arrow.Field{Name: n.fieldname, Type: dt, Nullable: true}
n.arrowField = arrow.Field{Name: n.name, Type: dt, Nullable: true}
return arrow.Field{Name: n.name, Type: dt, Nullable: true}
}

0 comments on commit a7acd32

Please sign in to comment.