Skip to content

Commit

Permalink
Getting more stuff to work.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 10, 2023
1 parent d8e318c commit cb2e988
Show file tree
Hide file tree
Showing 8 changed files with 1,239 additions and 1,188 deletions.
48 changes: 48 additions & 0 deletions arrowexec/aggregates/sum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package aggregates

import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/bitutil"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/nodes"
"github.com/cube2222/octosql/octosql"
"github.com/cube2222/octosql/physical"
)

var SumOverloads = []physical.AggregateDescriptor{
{
ArgumentType: octosql.Int,
OutputType: octosql.Int,
Prototype: NewSumIntPrototype(),
},
}

func NewSumIntPrototype() func() nodes.Aggregate {
return func() nodes.Aggregate {
return &SumInt{
data: memory.NewResizableBuffer(memory.NewGoAllocator()),
}
}
// TODO: Implement for nullable, probably a wrapping aggregate column consumer that just ignores nulls. Actually, that wrapper would set the null bitmap.
}

type SumInt struct {
data *memory.Buffer
state []int64 // This uses the above data as the storage underneath.
}

func (agg *SumInt) MakeColumnConsumer(arr arrow.Array) func(entryIndex uint, rowIndex uint) {
typedArr := arr.(*array.Int64).Int64Values()
return func(entryIndex uint, rowIndex uint) {
if entryIndex >= uint(len(agg.state)) {
agg.data.Resize(arrow.Int64Traits.BytesRequired(bitutil.NextPowerOf2(int(entryIndex) + 1)))
agg.state = arrow.Int64Traits.CastFromBytes(agg.data.Bytes())
}
agg.state[entryIndex] += typedArr[rowIndex]
}
}

func (agg *SumInt) GetBatch(length int, offset int) arrow.Array {
return array.NewInt64Data(array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, agg.data}, nil, 0 /*TODO: Fixme*/, offset))
}
4 changes: 4 additions & 0 deletions arrowexec/aggregates/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ var Aggregates = map[string]physical.AggregateDetails{
Description: "Counts all items in the group.",
Descriptors: CountOverloads,
},
"sum": {
Description: "Sums all items in the group.",
Descriptors: SumOverloads,
},
}
67 changes: 49 additions & 18 deletions arrowexec/execution/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/compute"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/arrow/scalar"
)
Expand Down Expand Up @@ -50,33 +48,66 @@ func (c *ConstArray) Evaluate(ctx Context, record Record) (arrow.Array, error) {
//
// }

type ArrowComputeFunctionCall struct {
Name string
Arguments []Expression
type FunctionCall struct {
function func([]arrow.Array) (arrow.Array, error)
args []Expression
nullCheckIndices []int
strict bool
}

func (f *ArrowComputeFunctionCall) Evaluate(ctx Context, record Record) (arrow.Array, error) {
// TODO: Optimize all of this if it's too slow.
args := make([]compute.Datum, len(f.Arguments))
for i, arg := range f.Arguments {
func NewFunctionCall(function func([]arrow.Array) (arrow.Array, error), args []Expression, nullCheckIndices []int, strict bool) *FunctionCall {
return &FunctionCall{
function: function,
args: args,
nullCheckIndices: nullCheckIndices,
strict: strict,
}
}

func (f *FunctionCall) Evaluate(ctx Context, record Record) (arrow.Array, error) {
args := make([]arrow.Array, len(f.args))
for i, arg := range f.args {
arr, err := arg.Evaluate(ctx, record)
if err != nil {
return nil, fmt.Errorf("couldn't evaluate argument %d: %w", i, err)
}
args[i] = &compute.ArrayDatum{Value: arr.Data()}
args[i] = arr
}

fn, ok := compute.GetFunctionRegistry().GetFunction(f.Name)
if !ok {
panic("Bug: array function not found")
if f.strict {
// TODO: Handle validity bitmaps.
}
out, err := fn.Execute(ctx.Context, nil, args...)
if err != nil {
return nil, fmt.Errorf("couldn't execute function: %w", err)
}
return array.MakeFromData(out.(*compute.ArrayDatum).Value), nil

return f.function(args)
}

// type ArrowComputeFunctionCall struct {
// Name string
// Arguments []Expression
// }
//
// func (f *ArrowComputeFunctionCall) Evaluate(ctx Context, record Record) (arrow.Array, error) {
// // TODO: Optimize all of this if it's too slow.
// args := make([]compute.Datum, len(f.Arguments))
// for i, arg := range f.Arguments {
// arr, err := arg.Evaluate(ctx, record)
// if err != nil {
// return nil, fmt.Errorf("couldn't evaluate argument %d: %w", i, err)
// }
// args[i] = &compute.ArrayDatum{Value: arr.Data()}
// }
//
// fn, ok := compute.GetFunctionRegistry().GetFunction(f.Name)
// if !ok {
// panic("Bug: array function not found")
// }
// out, err := fn.Execute(ctx.Context, nil, args...)
// if err != nil {
// return nil, fmt.Errorf("couldn't execute function: %w", err)
// }
// return array.MakeFromData(out.(*compute.ArrayDatum).Value), nil
// }

type Constant struct {
Value scalar.Scalar
}
Expand Down
4 changes: 4 additions & 0 deletions arrowexec/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func floatReader(builder array.Builder) ValueReaderFunc {
func stringReader(builder array.Builder) ValueReaderFunc {
stringBuilder := builder.(*array.StringBuilder)
return func(value *fastjson.Value) error {
if value == nil { // TODO: This should only be activated if field is null?
stringBuilder.AppendNull()
return nil
}
v, err := value.StringBytes()
if err != nil {
return fmt.Errorf("couldn't read string: %w", err)
Expand Down
32 changes: 0 additions & 32 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,38 +139,6 @@ type Aggregate interface {
GetBatch(length int, offset int) arrow.Array
}

func MakeSum(dt arrow.DataType) Aggregate { // TODO: octosql.Type?
switch dt.ID() {
case arrow.INT64:
return &SumInt{
data: memory.NewResizableBuffer(memory.NewGoAllocator()), // TODO: Get allocator as argument.
}
default:
panic("unsupported type for sum")
}
// TODO: Implement for nullable, probably a wrapping aggregate column consumer that just ignores nulls. Actually, that wrapper would set the null bitmap.
}

type SumInt struct {
data *memory.Buffer
state []int64 // This uses the above data as the storage underneath.
}

func (agg *SumInt) MakeColumnConsumer(arr arrow.Array) func(entryIndex uint, rowIndex uint) {
typedArr := arr.(*array.Int64).Int64Values()
return func(entryIndex uint, rowIndex uint) {
if entryIndex >= uint(len(agg.state)) {
agg.data.Resize(arrow.Int64Traits.BytesRequired(bitutil.NextPowerOf2(int(entryIndex) + 1)))
agg.state = arrow.Int64Traits.CastFromBytes(agg.data.Bytes())
}
agg.state[entryIndex] += typedArr[rowIndex]
}
}

func (agg *SumInt) GetBatch(length int, offset int) arrow.Array {
return array.NewInt64Data(array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, agg.data}, nil, 0 /*TODO: Fixme*/, offset))
}

func MakeKey(dt arrow.DataType) Key {
switch dt.ID() {
case arrow.INT64:
Expand Down
Loading

0 comments on commit cb2e988

Please sign in to comment.