Skip to content

Commit

Permalink
Add list support to json reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Jul 30, 2023
1 parent e9548fa commit 9b779e6
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions arrowexec/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func recordReader(schema *arrow.Schema, recordBuilder *array.RecordBuilder) (Val
readers := make([]ValueReaderFunc, len(schema.Fields()))
for i, field := range fields {
var err error
readers[i], err = valueReader(field, recordBuilder.Field(i))
readers[i], err = fieldReader(field, recordBuilder.Field(i))
if err != nil {
return nil, fmt.Errorf("couldn't create value reader for field %v: %w", field.Name, err)
}
Expand All @@ -88,9 +88,9 @@ func recordReader(schema *arrow.Schema, recordBuilder *array.RecordBuilder) (Val
}, nil
}

func valueReader(dt arrow.Field, builder array.Builder) (ValueReaderFunc, error) {
func fieldReader(field arrow.Field, builder array.Builder) (ValueReaderFunc, error) {
var reader ValueReaderFunc
switch dt.Type.ID() {
switch field.Type.ID() {
case arrow.BOOL:
reader = boolReader(builder)
case arrow.INT64:
Expand All @@ -99,12 +99,18 @@ func valueReader(dt arrow.Field, builder array.Builder) (ValueReaderFunc, error)
reader = floatReader(builder)
case arrow.STRING:
reader = stringReader(builder)
case arrow.LIST:
var err error
reader, err = listReader(field.Type.(*arrow.ListType).ElemField(), builder)
if err != nil {
return nil, fmt.Errorf("couldn't construct list reader: %w", err)
}
// TODO: Handle structs, lists, and unions.
default:
return nil, fmt.Errorf("unsupported type: %v", dt)
return nil, fmt.Errorf("unsupported type: %v", field)
}

if dt.Nullable {
if field.Nullable {
reader = nullableReader(builder, reader)
}
return reader, nil
Expand Down Expand Up @@ -158,6 +164,27 @@ func stringReader(builder array.Builder) ValueReaderFunc {
}
}

func listReader(field arrow.Field, builder array.Builder) (ValueReaderFunc, error) {
listBuilder := builder.(*array.ListBuilder)
valueReader, err := fieldReader(field, listBuilder.ValueBuilder())
if err != nil {
panic(err)
}
return func(value *fastjson.Value) error {
listBuilder.Append(true)
list, err := value.Array()
if err != nil {
return fmt.Errorf("couldn't read json array: %w", err)
}
for i := 0; i < len(list); i++ {
if err := valueReader(list[i]); err != nil {
return fmt.Errorf("couldn't read list element: %w", err)
}
}
return nil
}, nil
}

func nullableReader(builder array.Builder, reader ValueReaderFunc) ValueReaderFunc {
return func(value *fastjson.Value) error {
if value == nil || value.Type() == fastjson.TypeNull {
Expand Down

0 comments on commit 9b779e6

Please sign in to comment.